You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2014/05/07 22:57:48 UTC

[1/2] git commit: Fix broken build

Repository: cassandra
Updated Branches:
  refs/heads/trunk fe2d7ddaf -> 56c76bebe


Fix broken build


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/60dbe8b7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/60dbe8b7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/60dbe8b7

Branch: refs/heads/trunk
Commit: 60dbe8b700ee0ee1a15fbcb94f9543d1477de7a3
Parents: 0cb1db6
Author: Jake Luciani <ja...@apache.org>
Authored: Wed May 7 16:55:53 2014 -0400
Committer: Jake Luciani <ja...@apache.org>
Committed: Wed May 7 16:55:53 2014 -0400

----------------------------------------------------------------------
 .../cql3/statements/ModificationStatement.java  | 338 ++++++++-----------
 1 file changed, 141 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/60dbe8b7/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 23f7cfe..7f8b678 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -23,23 +23,18 @@ import java.util.*;
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import org.github.jamm.MemoryMeter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.CBuilder;
+import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
-import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.db.marshal.BooleanType;
 import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.service.CASConditions;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
@@ -54,21 +49,16 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
 {
     private static final ColumnIdentifier CAS_RESULT_COLUMN = new ColumnIdentifier("[applied]", false);
 
-    private static final Logger logger = LoggerFactory.getLogger(ModificationStatement.class);
-
-    private static boolean loggedCounterTTL = false;
-    private static boolean loggedCounterTimestamp = false;
-
     public static enum StatementType { INSERT, UPDATE, DELETE }
     public final StatementType type;
 
+    private final int boundTerms;
     public final CFMetaData cfm;
     public final Attributes attrs;
 
     private final Map<ColumnIdentifier, Restriction> processedKeys = new HashMap<ColumnIdentifier, Restriction>();
     private final List<Operation> columnOperations = new ArrayList<Operation>();
 
-    private int boundTerms;
     // Separating normal and static conditions makes things somewhat easier
     private List<ColumnCondition> columnConditions;
     private List<ColumnCondition> staticConditions;
@@ -80,17 +70,18 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
     private boolean setsStaticColumns;
     private boolean setsRegularColumns;
 
-    private final Function<ColumnCondition, ColumnIdentifier> getColumnForCondition = new Function<ColumnCondition, ColumnIdentifier>()
+    private final Function<ColumnCondition, ColumnDefinition> getColumnForCondition = new Function<ColumnCondition, ColumnDefinition>()
     {
-        public ColumnIdentifier apply(ColumnCondition cond)
+        public ColumnDefinition apply(ColumnCondition cond)
         {
-            return cond.column.name;
+            return cond.column;
         }
     };
 
-    public ModificationStatement(StatementType type, CFMetaData cfm, Attributes attrs)
+    public ModificationStatement(StatementType type, int boundTerms, CFMetaData cfm, Attributes attrs)
     {
         this.type = type;
+        this.boundTerms = boundTerms;
         this.cfm = cfm;
         this.attrs = attrs;
     }
@@ -106,7 +97,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
     }
 
     public abstract boolean requireFullClusteringKey();
-    public abstract void addUpdateForKey(ColumnFamily updates, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params) throws InvalidRequestException;
+    public abstract void addUpdateForKey(ColumnFamily updates, ByteBuffer key, Composite prefix, UpdateParameters params) throws InvalidRequestException;
 
     public int getBoundTerms()
     {
@@ -125,12 +116,12 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
 
     public boolean isCounter()
     {
-        return cfm.getDefaultValidator().isCommutative();
+        return cfm.isCounter();
     }
 
-    public long getTimestamp(long now, List<ByteBuffer> variables) throws InvalidRequestException
+    public long getTimestamp(long now, QueryOptions options) throws InvalidRequestException
     {
-        return attrs.getTimestamp(now, variables);
+        return attrs.getTimestamp(now, options);
     }
 
     public boolean isTimestampSet()
@@ -138,9 +129,9 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return attrs.isTimestampSet();
     }
 
-    public int getTimeToLive(List<ByteBuffer> variables) throws InvalidRequestException
+    public int getTimeToLive(QueryOptions options) throws InvalidRequestException
     {
-        return attrs.getTimeToLive(variables);
+        return attrs.getTimeToLive(options);
     }
 
     public void checkAccess(ClientState state) throws InvalidRequestException, UnauthorizedException
@@ -155,31 +146,18 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
     public void validate(ClientState state) throws InvalidRequestException
     {
         if (hasConditions() && attrs.isTimestampSet())
-            throw new InvalidRequestException("Cannot provide custom timestamp for conditional update");
+            throw new InvalidRequestException("Cannot provide custom timestamp for conditional updates");
 
-        if (isCounter())
-        {
-            if (attrs.isTimestampSet() && !loggedCounterTimestamp)
-            {
-                logger.warn("Detected use of 'USING TIMESTAMP' in a counter UPDATE. This is invalid " +
-                            "because counters do not use timestamps, and the timestamp has been ignored. " +
-                            "Such queries will be rejected in Cassandra 2.1+ - please fix your queries before then.");
-                loggedCounterTimestamp = true;
-            }
+        if (isCounter() && attrs.isTimestampSet())
+            throw new InvalidRequestException("Cannot provide custom timestamp for counter updates");
 
-            if (attrs.isTimeToLiveSet() && !loggedCounterTTL)
-            {
-                logger.warn("Detected use of 'USING TTL' in a counter UPDATE. This is invalid " +
-                            "because counter tables do not support TTL, and the TTL value has been ignored. " +
-                            "Such queries will be rejected in Cassandra 2.1+ - please fix your queries before then.");
-                loggedCounterTTL = true;
-            }
-        }
+        if (isCounter() && attrs.isTimeToLiveSet())
+            throw new InvalidRequestException("Cannot provide custom TTL for counter updates");
     }
 
     public void addOperation(Operation op)
     {
-        if (op.isStatic(cfm))
+        if (op.column.isStatic())
             setsStaticColumns = true;
         else
             setsRegularColumns = true;
@@ -191,19 +169,19 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return columnOperations;
     }
 
-    public Iterable<ColumnIdentifier> getColumnsWithConditions()
+    public Iterable<ColumnDefinition> getColumnsWithConditions()
     {
         if (ifNotExists || ifExists)
             return null;
 
-        return Iterables.concat(columnConditions == null ? Collections.<ColumnIdentifier>emptyList() : Iterables.transform(columnConditions, getColumnForCondition),
-                                staticConditions == null ? Collections.<ColumnIdentifier>emptyList() : Iterables.transform(staticConditions, getColumnForCondition));
+        return Iterables.concat(columnConditions == null ? Collections.<ColumnDefinition>emptyList() : Iterables.transform(columnConditions, getColumnForCondition),
+                                staticConditions == null ? Collections.<ColumnDefinition>emptyList() : Iterables.transform(staticConditions, getColumnForCondition));
     }
 
     public void addCondition(ColumnCondition cond) throws InvalidRequestException
     {
         List<ColumnCondition> conds = null;
-        if (cond.column.kind == CFDefinition.Name.Kind.STATIC)
+        if (cond.column.isStatic())
         {
             setsStaticColumns = true;
             if (staticConditions == null)
@@ -240,45 +218,44 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return ifExists;
     }
 
-    private void addKeyValues(CFDefinition.Name name, Restriction values) throws InvalidRequestException
+    private void addKeyValues(ColumnDefinition def, Restriction values) throws InvalidRequestException
     {
-        if (name.kind == CFDefinition.Name.Kind.COLUMN_ALIAS)
+        if (def.kind == ColumnDefinition.Kind.CLUSTERING_COLUMN)
             hasNoClusteringColumns = false;
-        if (processedKeys.put(name.name, values) != null)
-            throw new InvalidRequestException(String.format("Multiple definitions found for PRIMARY KEY part %s", name.name));
+        if (processedKeys.put(def.name, values) != null)
+            throw new InvalidRequestException(String.format("Multiple definitions found for PRIMARY KEY part %s", def.name));
     }
 
-    public void addKeyValue(CFDefinition.Name name, Term value) throws InvalidRequestException
+    public void addKeyValue(ColumnDefinition def, Term value) throws InvalidRequestException
     {
-        addKeyValues(name, new Restriction.EQ(value, false));
+        addKeyValues(def, new Restriction.EQ(value, false));
     }
 
     public void processWhereClause(List<Relation> whereClause, VariableSpecifications names) throws InvalidRequestException
     {
-        CFDefinition cfDef = cfm.getCfDef();
         for (Relation rel : whereClause)
         {
-            CFDefinition.Name name = cfDef.get(rel.getEntity());
-            if (name == null)
+            ColumnDefinition def = cfm.getColumnDefinition(rel.getEntity());
+            if (def == null)
                 throw new InvalidRequestException(String.format("Unknown key identifier %s", rel.getEntity()));
 
-            switch (name.kind)
+            switch (def.kind)
             {
-                case KEY_ALIAS:
-                case COLUMN_ALIAS:
+                case PARTITION_KEY:
+                case CLUSTERING_COLUMN:
                     Restriction restriction;
 
                     if (rel.operator() == Relation.Type.EQ)
                     {
-                        Term t = rel.getValue().prepare(name);
+                        Term t = rel.getValue().prepare(keyspace(), def);
                         t.collectMarkerSpecification(names);
                         restriction = new Restriction.EQ(t, false);
                     }
-                    else if (name.kind == CFDefinition.Name.Kind.KEY_ALIAS && rel.operator() == Relation.Type.IN)
+                    else if (def.kind == ColumnDefinition.Kind.PARTITION_KEY && rel.operator() == Relation.Type.IN)
                     {
                         if (rel.getValue() != null)
                         {
-                            Term t = rel.getValue().prepare(name);
+                            Term t = rel.getValue().prepare(keyspace(), def);
                             t.collectMarkerSpecification(names);
                             restriction = Restriction.IN.create(t);
                         }
@@ -287,7 +264,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
                             List<Term> values = new ArrayList<Term>(rel.getInValues().size());
                             for (Term.Raw raw : rel.getInValues())
                             {
-                                Term t = raw.prepare(name);
+                                Term t = raw.prepare(keyspace(), def);
                                 t.collectMarkerSpecification(names);
                                 values.add(t);
                             }
@@ -296,40 +273,37 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
                     }
                     else
                     {
-                        throw new InvalidRequestException(String.format("Invalid operator %s for PRIMARY KEY part %s", rel.operator(), name));
+                        throw new InvalidRequestException(String.format("Invalid operator %s for PRIMARY KEY part %s", rel.operator(), def.name));
                     }
 
-                    addKeyValues(name, restriction);
+                    addKeyValues(def, restriction);
                     break;
-                case VALUE_ALIAS:
-                case COLUMN_METADATA:
-                case STATIC:
-                    throw new InvalidRequestException(String.format("Non PRIMARY KEY %s found in where clause", name));
+                default:
+                    throw new InvalidRequestException(String.format("Non PRIMARY KEY %s found in where clause", def.name));
             }
         }
     }
 
-    public List<ByteBuffer> buildPartitionKeyNames(List<ByteBuffer> variables)
+    public List<ByteBuffer> buildPartitionKeyNames(QueryOptions options)
     throws InvalidRequestException
     {
-        CFDefinition cfDef = cfm.getCfDef();
-        ColumnNameBuilder keyBuilder = cfDef.getKeyNameBuilder();
+        CBuilder keyBuilder = cfm.getKeyValidatorAsCType().builder();
         List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
-        for (CFDefinition.Name name : cfDef.partitionKeys())
+        for (ColumnDefinition def : cfm.partitionKeyColumns())
         {
-            Restriction r = processedKeys.get(name.name);
+            Restriction r = processedKeys.get(def.name);
             if (r == null)
-                throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", name));
+                throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", def.name));
 
-            List<ByteBuffer> values = r.values(variables);
+            List<ByteBuffer> values = r.values(options);
 
             if (keyBuilder.remainingCount() == 1)
             {
                 for (ByteBuffer val : values)
                 {
                     if (val == null)
-                        throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", name));
-                    ByteBuffer key = keyBuilder.copy().add(val).build();
+                        throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", def.name));
+                    ByteBuffer key = keyBuilder.buildWith(val).toByteBuffer();
                     ThriftValidation.validateKey(cfm, key);
                     keys.add(key);
                 }
@@ -340,14 +314,14 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
                     throw new InvalidRequestException("IN is only supported on the last column of the partition key");
                 ByteBuffer val = values.get(0);
                 if (val == null)
-                    throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", name));
+                    throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", def.name));
                 keyBuilder.add(val);
             }
         }
         return keys;
     }
 
-    public ColumnNameBuilder createClusteringPrefixBuilder(List<ByteBuffer> variables)
+    public Composite createClusteringPrefix(QueryOptions options)
     throws InvalidRequestException
     {
         // If the only updated/deleted columns are static, then we don't need clustering columns.
@@ -364,96 +338,83 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         {
             // If we set no non-static columns, then it's fine not to have clustering columns
             if (hasNoClusteringColumns)
-                return cfm.getStaticColumnNameBuilder();
+                return cfm.comparator.staticPrefix();
 
             // If we do have clustering columns however, then either it's an INSERT and the query is valid
             // but we still need to build a proper prefix, or it's not an INSERT, and then we want to reject
             // (see above)
             if (type != StatementType.INSERT)
             {
-                for (CFDefinition.Name name : cfm.getCfDef().clusteringColumns())
-                    if (processedKeys.get(name.name) != null)
-                        throw new InvalidRequestException(String.format("Invalid restriction on clustering column %s since the %s statement modifies only static columns", name.name, type));
+                for (ColumnDefinition def : cfm.clusteringColumns())
+                    if (processedKeys.get(def.name) != null)
+                        throw new InvalidRequestException(String.format("Invalid restriction on clustering column %s since the %s statement modifies only static columns", def.name, type));
                 // we should get there as it contradicts hasNoClusteringColumns == false
                 throw new AssertionError();
             }
         }
 
-        return createClusteringPrefixBuilderInternal(variables);
+        return createClusteringPrefixBuilderInternal(options);
     }
 
-    private ColumnNameBuilder updatePrefixFor(ByteBuffer name, ColumnNameBuilder prefix)
-    {
-        return isStatic(name) ? cfm.getStaticColumnNameBuilder() : prefix;
-    }
-
-    public boolean isStatic(ByteBuffer name)
-    {
-        ColumnDefinition def = cfm.getColumnDefinition(name);
-        return def != null && def.type == ColumnDefinition.Type.STATIC;
-    }
-
-    private ColumnNameBuilder createClusteringPrefixBuilderInternal(List<ByteBuffer> variables)
+    private Composite createClusteringPrefixBuilderInternal(QueryOptions options)
     throws InvalidRequestException
     {
-        CFDefinition cfDef = cfm.getCfDef();
-        ColumnNameBuilder builder = cfDef.getColumnNameBuilder();
-        CFDefinition.Name firstEmptyKey = null;
-        for (CFDefinition.Name name : cfDef.clusteringColumns())
+        CBuilder builder = cfm.comparator.prefixBuilder();
+        ColumnDefinition firstEmptyKey = null;
+        for (ColumnDefinition def : cfm.clusteringColumns())
         {
-            Restriction r = processedKeys.get(name.name);
+            Restriction r = processedKeys.get(def.name);
             if (r == null)
             {
-                firstEmptyKey = name;
-                if (requireFullClusteringKey() && cfDef.isComposite && !cfDef.isCompact)
-                    throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", name));
+                firstEmptyKey = def;
+                if (requireFullClusteringKey() && !cfm.comparator.isDense() && cfm.comparator.isCompound())
+                    throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", def.name));
             }
             else if (firstEmptyKey != null)
             {
-                throw new InvalidRequestException(String.format("Missing PRIMARY KEY part %s since %s is set", firstEmptyKey.name, name.name));
+                throw new InvalidRequestException(String.format("Missing PRIMARY KEY part %s since %s is set", firstEmptyKey.name, def.name));
             }
             else
             {
-                List<ByteBuffer> values = r.values(variables);
+                List<ByteBuffer> values = r.values(options);
                 assert values.size() == 1; // We only allow IN for row keys so far
                 ByteBuffer val = values.get(0);
                 if (val == null)
-                    throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s", name));
+                    throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s", def.name));
                 builder.add(val);
             }
         }
-        return builder;
+        return builder.build();
     }
 
-    protected CFDefinition.Name getFirstEmptyKey()
+    protected ColumnDefinition getFirstEmptyKey()
     {
-        for (CFDefinition.Name name : cfm.getCfDef().clusteringColumns())
+        for (ColumnDefinition def : cfm.clusteringColumns())
         {
-            if (processedKeys.get(name.name) == null)
-                return name;
+            if (processedKeys.get(def.name) == null)
+                return def;
         }
         return null;
     }
 
-    protected Map<ByteBuffer, ColumnGroupMap> readRequiredRows(Collection<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, boolean local, ConsistencyLevel cl)
+    protected Map<ByteBuffer, CQL3Row> readRequiredRows(Collection<ByteBuffer> partitionKeys, Composite clusteringPrefix, boolean local, ConsistencyLevel cl)
     throws RequestExecutionException, RequestValidationException
     {
         // Lists SET operation incurs a read.
-        Set<ByteBuffer> toRead = null;
+        boolean requiresRead = false;
         for (Operation op : columnOperations)
         {
             if (op.requiresRead())
             {
-                if (toRead == null)
-                    toRead = new TreeSet<ByteBuffer>(UTF8Type.instance);
-                toRead.add(op.columnName.key);
+                requiresRead = true;
+                break;
             }
         }
 
-        return toRead == null ? null : readRows(partitionKeys, clusteringPrefix, toRead, (CompositeType)cfm.comparator, local, cl);
+        return requiresRead ? readRows(partitionKeys, clusteringPrefix, cfm, local, cl) : null;
     }
 
-    private Map<ByteBuffer, ColumnGroupMap> readRows(Collection<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, Set<ByteBuffer> toRead, CompositeType composite, boolean local, ConsistencyLevel cl)
+    protected Map<ByteBuffer, CQL3Row> readRows(Collection<ByteBuffer> partitionKeys, Composite rowPrefix, CFMetaData cfm, boolean local, ConsistencyLevel cl)
     throws RequestExecutionException, RequestValidationException
     {
         try
@@ -465,16 +426,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
             throw new InvalidRequestException(String.format("Write operation require a read but consistency %s is not supported on reads", cl));
         }
 
-        ColumnSlice[] slices = new ColumnSlice[toRead.size()];
-        int i = 0;
-        for (ByteBuffer name : toRead)
-        {
-            ColumnNameBuilder prefix = updatePrefixFor(name, clusteringPrefix);
-            ByteBuffer start = prefix.copy().add(name).build();
-            ByteBuffer finish = prefix.copy().add(name).buildAsEndOfRange();
-            slices[i++] = new ColumnSlice(start, finish);
-        }
-
+        ColumnSlice[] slices = new ColumnSlice[]{ rowPrefix.slice() };
         List<ReadCommand> commands = new ArrayList<ReadCommand>(partitionKeys.size());
         long now = System.currentTimeMillis();
         for (ByteBuffer key : partitionKeys)
@@ -488,20 +440,19 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
                        ? SelectStatement.readLocally(keyspace(), commands)
                        : StorageProxy.read(commands, cl);
 
-        Map<ByteBuffer, ColumnGroupMap> map = new HashMap<ByteBuffer, ColumnGroupMap>();
+        Map<ByteBuffer, CQL3Row> map = new HashMap<ByteBuffer, CQL3Row>();
         for (Row row : rows)
         {
-            if (row.cf == null || row.cf.getColumnCount() == 0)
+            if (row.cf == null || row.cf.isEmpty())
                 continue;
 
-            ColumnGroupMap.Builder groupBuilder = new ColumnGroupMap.Builder(composite, true, now);
-            for (Column column : row.cf)
-                groupBuilder.add(column);
-
-            List<ColumnGroupMap> groups = groupBuilder.groups();
-            assert groups.isEmpty() || groups.size() == 1;
-            if (!groups.isEmpty())
-                map.put(row.key.key, groups.get(0));
+            Iterator<CQL3Row> iter = cfm.comparator.CQL3RowBuilder(cfm, now).group(row.cf.getSortedColumns().iterator());
+            if (iter.hasNext())
+            {
+                map.put(row.key.getKey(), iter.next());
+                // We can only update one CQ3Row per partition key at a time (we don't allow IN for clustering key)
+                assert !iter.hasNext();
+            }
         }
         return map;
     }
@@ -537,7 +488,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         else
             cl.validateForWrite(cfm.ksName);
 
-        Collection<? extends IMutation> mutations = getMutations(options.getValues(), false, cl, queryState.getTimestamp());
+        Collection<? extends IMutation> mutations = getMutations(options, false, options.getTimestamp(queryState));
         if (!mutations.isEmpty())
             StorageProxy.mutateWithTriggers(mutations, cl, false);
 
@@ -547,18 +498,18 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
     public ResultMessage executeWithCondition(QueryState queryState, QueryOptions options)
     throws RequestExecutionException, RequestValidationException
     {
-        List<ByteBuffer> variables = options.getValues();
-        List<ByteBuffer> keys = buildPartitionKeyNames(variables);
+        List<ByteBuffer> keys = buildPartitionKeyNames(options);
         // We don't support IN for CAS operation so far
         if (keys.size() > 1)
             throw new InvalidRequestException("IN on the partition key is not supported with conditional updates");
 
         ByteBuffer key = keys.get(0);
 
-        CQL3CasConditions conditions = new CQL3CasConditions(cfm, queryState.getTimestamp());
-        ColumnNameBuilder prefix = createClusteringPrefixBuilder(variables);
-        ColumnFamily updates = UnsortedColumns.factory.create(cfm);
-        addUpdatesAndConditions(key, prefix, updates, conditions, variables, getTimestamp(queryState.getTimestamp(), variables));
+        long now = options.getTimestamp(queryState);
+        CQL3CasConditions conditions = new CQL3CasConditions(cfm, now);
+        Composite prefix = createClusteringPrefix(options);
+        ColumnFamily updates = ArrayBackedSortedColumns.factory.create(cfm);
+        addUpdatesAndConditions(key, prefix, updates, conditions, options, getTimestamp(now, options));
 
         ColumnFamily result = StorageProxy.cas(keyspace(),
                                                columnFamily(),
@@ -570,16 +521,16 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return new ResultMessage.Rows(buildCasResultSet(key, result));
     }
 
-    public void addUpdatesAndConditions(ByteBuffer key, ColumnNameBuilder clusteringPrefix, ColumnFamily updates, CQL3CasConditions conditions, List<ByteBuffer> variables, long now)
+    public void addUpdatesAndConditions(ByteBuffer key, Composite clusteringPrefix, ColumnFamily updates, CQL3CasConditions conditions, QueryOptions options, long now)
     throws InvalidRequestException
     {
-        UpdateParameters updParams = new UpdateParameters(cfm, variables, now, getTimeToLive(variables), null);
+        UpdateParameters updParams = new UpdateParameters(cfm, options, now, getTimeToLive(options), null);
         addUpdateForKey(updates, key, clusteringPrefix, updParams);
 
         if (ifNotExists)
         {
             // If we use ifNotExists, if the statement applies to any non static columns, then the condition is on the row of the non-static
-            // columns and the prefix should be the rowPrefix. But if only static columns are set, then the ifNotExists apply to the existence
+            // columns and the prefix should be the clusteringPrefix. But if only static columns are set, then the ifNotExists apply to the existence
             // of any static columns and we should use the prefix for the "static part" of the partition.
             conditions.addNotExist(clusteringPrefix);
         }
@@ -590,9 +541,9 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         else
         {
             if (columnConditions != null)
-                conditions.addConditions(clusteringPrefix, columnConditions, variables);
+                conditions.addConditions(clusteringPrefix, columnConditions, options);
             if (staticConditions != null)
-                conditions.addConditions(cfm.getStaticColumnNameBuilder(), staticConditions, variables);
+                conditions.addConditions(cfm.comparator.staticPrefix(), staticConditions, options);
         }
     }
 
@@ -601,7 +552,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return buildCasResultSet(keyspace(), key, columnFamily(), cf, getColumnsWithConditions(), false);
     }
 
-    public static ResultSet buildCasResultSet(String ksName, ByteBuffer key, String cfName, ColumnFamily cf, Iterable<ColumnIdentifier> columnsWithConditions, boolean isBatch)
+    public static ResultSet buildCasResultSet(String ksName, ByteBuffer key, String cfName, ColumnFamily cf, Iterable<ColumnDefinition> columnsWithConditions, boolean isBatch)
     throws InvalidRequestException
     {
         boolean success = cf == null;
@@ -637,34 +588,33 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return new ResultSet(new ResultSet.Metadata(specs), rows);
     }
 
-    private static ResultSet buildCasFailureResultSet(ByteBuffer key, ColumnFamily cf, Iterable<ColumnIdentifier> columnsWithConditions, boolean isBatch)
+    private static ResultSet buildCasFailureResultSet(ByteBuffer key, ColumnFamily cf, Iterable<ColumnDefinition> columnsWithConditions, boolean isBatch)
     throws InvalidRequestException
     {
-        CFDefinition cfDef = cf.metadata().getCfDef();
-
+        CFMetaData cfm = cf.metadata();
         Selection selection;
         if (columnsWithConditions == null)
         {
-            selection = Selection.wildcard(cfDef);
+            selection = Selection.wildcard(cfm);
         }
         else
         {
-            List<CFDefinition.Name> names = new ArrayList<CFDefinition.Name>();
+            List<ColumnDefinition> defs = new ArrayList<>();
             // Adding the partition key for batches to disambiguate if the conditions span multipe rows (we don't add them outside
             // of batches for compatibility sakes).
             if (isBatch)
             {
-                names.addAll(cfDef.partitionKeys());
-                names.addAll(cfDef.clusteringColumns());
+                defs.addAll(cfm.partitionKeyColumns());
+                defs.addAll(cfm.clusteringColumns());
             }
-            for (ColumnIdentifier id : columnsWithConditions)
-                names.add(cfDef.get(id));
-            selection = Selection.forColumns(names);
+            for (ColumnDefinition def : columnsWithConditions)
+                defs.add(def);
+            selection = Selection.forColumns(defs);
         }
 
         long now = System.currentTimeMillis();
         Selection.ResultSetBuilder builder = selection.resultSetBuilder(now);
-        SelectStatement.forSelection(cfDef, selection).processColumnFamily(key, cf, Collections.<ByteBuffer>emptyList(), now, builder);
+        SelectStatement.forSelection(cfm, selection).processColumnFamily(key, cf, QueryOptions.DEFAULT, now, builder);
 
         return builder.build();
     }
@@ -674,15 +624,19 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         if (hasConditions())
             throw new UnsupportedOperationException();
 
-        for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp()))
-            mutation.apply();
+        for (IMutation mutation : getMutations(QueryOptions.DEFAULT, true, queryState.getTimestamp()))
+        {
+            // We don't use counters internally.
+            assert mutation instanceof Mutation;
+            ((Mutation) mutation).apply();
+        }
         return null;
     }
 
     /**
      * Convert statement into a list of mutations to apply on the server
      *
-     * @param variables value for prepared statement markers
+     * @param options value for prepared statement markers
      * @param local if true, any requests (for collections) performed by getMutation should be done locally only.
      * @param cl the consistency to use for the potential reads involved in generating the mutations (for lists set/delete operations)
      * @param now the current timestamp in microseconds to use if no timestamp is user provided.
@@ -690,37 +644,36 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
      * @return list of the mutations
      * @throws InvalidRequestException on invalid requests
      */
-    private Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
+    private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now)
     throws RequestExecutionException, RequestValidationException
     {
-        List<ByteBuffer> keys = buildPartitionKeyNames(variables);
-        ColumnNameBuilder clusteringPrefix = createClusteringPrefixBuilder(variables);
+        List<ByteBuffer> keys = buildPartitionKeyNames(options);
+        Composite clusteringPrefix = createClusteringPrefix(options);
 
-        UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, variables, local, cl, now);
+        UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, options, local, now);
 
         Collection<IMutation> mutations = new ArrayList<IMutation>();
         for (ByteBuffer key: keys)
         {
             ThriftValidation.validateKey(cfm, key);
-            ColumnFamily cf = UnsortedColumns.factory.create(cfm);
+            ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm);
             addUpdateForKey(cf, key, clusteringPrefix, params);
-            RowMutation rm = new RowMutation(cfm.ksName, key, cf);
-            mutations.add(isCounter() ? new CounterMutation(rm, cl) : rm);
+            Mutation mut = new Mutation(cfm.ksName, key, cf);
+            mutations.add(isCounter() ? new CounterMutation(mut, options.getConsistency()) : mut);
         }
         return mutations;
     }
 
     public UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
-                                                 ColumnNameBuilder prefix,
-                                                 List<ByteBuffer> variables,
+                                                 Composite prefix,
+                                                 QueryOptions options,
                                                  boolean local,
-                                                 ConsistencyLevel cl,
                                                  long now)
     throws RequestExecutionException, RequestValidationException
     {
         // Some lists operation requires reading
-        Map<ByteBuffer, ColumnGroupMap> rows = readRequiredRows(keys, prefix, local, cl);
-        return new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows);
+        Map<ByteBuffer, CQL3Row> rows = readRequiredRows(keys, prefix, local, options.getConsistency());
+        return new UpdateParameters(cfm, options, getTimestamp(now, options), getTimeToLive(options), rows);
     }
 
     public static abstract class Parsed extends CFStatement
@@ -749,16 +702,11 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         public ModificationStatement prepare(VariableSpecifications boundNames) throws InvalidRequestException
         {
             CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
-            CFDefinition cfDef = metadata.getCfDef();
-
-            // The collected count in the beginning of preparation.
-            // Will start at non-zero for statements nested inside a BatchStatement (the second and the further ones).
-            int collected = boundNames.getCollectedCount();
 
             Attributes preparedAttributes = attrs.prepare(keyspace(), columnFamily());
             preparedAttributes.collectMarkerSpecification(boundNames);
 
-            ModificationStatement stmt = prepareInternal(cfDef, boundNames, preparedAttributes);
+            ModificationStatement stmt = prepareInternal(metadata, boundNames, preparedAttributes);
 
             if (ifNotExists || ifExists || !conditions.isEmpty())
             {
@@ -766,7 +714,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
                     throw new InvalidRequestException("Conditional updates are not supported on counter tables");
 
                 if (attrs.timestamp != null)
-                    throw new InvalidRequestException("Cannot provide custom timestamp for conditional update");
+                    throw new InvalidRequestException("Cannot provide custom timestamp for conditional updates");
 
                 if (ifNotExists)
                 {
@@ -786,32 +734,28 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
                 {
                     for (Pair<ColumnIdentifier, ColumnCondition.Raw> entry : conditions)
                     {
-                        CFDefinition.Name name = cfDef.get(entry.left);
-                        if (name == null)
+                        ColumnDefinition def = metadata.getColumnDefinition(entry.left);
+                        if (def == null)
                             throw new InvalidRequestException(String.format("Unknown identifier %s", entry.left));
 
-                        ColumnCondition condition = entry.right.prepare(name);
+                        ColumnCondition condition = entry.right.prepare(keyspace(), def);
                         condition.collectMarkerSpecification(boundNames);
 
-                        switch (name.kind)
+                        switch (def.kind)
                         {
-                            case KEY_ALIAS:
-                            case COLUMN_ALIAS:
+                            case PARTITION_KEY:
+                            case CLUSTERING_COLUMN:
                                 throw new InvalidRequestException(String.format("PRIMARY KEY part %s found in SET part", entry.left));
-                            case VALUE_ALIAS:
-                            case COLUMN_METADATA:
-                            case STATIC:
+                            default:
                                 stmt.addCondition(condition);
                                 break;
                         }
                     }
                 }
             }
-
-            stmt.boundTerms = boundNames.getCollectedCount() - collected;
             return stmt;
         }
 
-        protected abstract ModificationStatement prepareInternal(CFDefinition cfDef, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException;
+        protected abstract ModificationStatement prepareInternal(CFMetaData cfm, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException;
     }
 }


[2/2] git commit: Merge branch 'cassandra-2.1' into trunk

Posted by ja...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/56c76beb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/56c76beb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/56c76beb

Branch: refs/heads/trunk
Commit: 56c76bebe2f79569e6ccd83deb00254fa7da4776
Parents: fe2d7dd 60dbe8b
Author: Jake Luciani <ja...@apache.org>
Authored: Wed May 7 16:56:13 2014 -0400
Committer: Jake Luciani <ja...@apache.org>
Committed: Wed May 7 16:56:13 2014 -0400

----------------------------------------------------------------------
 .../apache/cassandra/cql3/statements/ModificationStatement.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------