You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/02/20 10:15:16 UTC

[1/2] Add static columns in CQL3

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 b2b3055c7 -> b09d87691


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index 4852cf7..cd5f2a2 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -31,9 +31,9 @@ import org.apache.cassandra.utils.Pair;
  */
 public class DeleteStatement extends ModificationStatement
 {
-    private DeleteStatement(CFMetaData cfm, Attributes attrs)
+    private DeleteStatement(StatementType type, CFMetaData cfm, Attributes attrs)
     {
-        super(cfm, attrs);
+        super(type, cfm, attrs);
     }
 
     public boolean requireFullClusteringKey()
@@ -44,15 +44,30 @@ public class DeleteStatement extends ModificationStatement
     public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
     throws InvalidRequestException
     {
-        CFDefinition cfDef = cfm.getCfDef();
         ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfm);
+        addUpdateForKey(cf, key, builder, params);
+        return cf;
+    }
+
+    public void addUpdateForKey(ColumnFamily cf, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
+    throws InvalidRequestException
+    {
+        CFDefinition cfDef = cfm.getCfDef();
         List<Operation> deletions = getOperations();
 
-        boolean fullKey = builder.componentCount() == cfDef.columns.size();
+        boolean fullKey = builder.componentCount() == cfDef.clusteringColumnsCount();
         boolean isRange = cfDef.isCompact ? !fullKey : (!fullKey || deletions.isEmpty());
 
         if (!deletions.isEmpty() && isRange)
-            throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s since %s specified", getFirstEmptyKey(), deletions.get(0).columnName));
+        {
+            // We only get there if we have at least one non-static columns selected, as otherwise the builder will be
+            // the "static" builder and isRange will be false. But we may still have static columns, so pick the first
+            // non static one for the error message so it's not confusing
+            for (Operation deletion : deletions)
+                if (cfm.getCfDef().get(deletion.columnName).kind != CFDefinition.Name.Kind.STATIC)
+                    throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s since %s specified", getFirstEmptyKey(), deletion.columnName));
+            throw new AssertionError();
+        }
 
         if (deletions.isEmpty() && builder.componentCount() == 0)
         {
@@ -83,8 +98,6 @@ public class DeleteStatement extends ModificationStatement
                 }
             }
         }
-
-        return cf;
     }
 
     public static class Parsed extends ModificationStatement.Parsed
@@ -96,7 +109,7 @@ public class DeleteStatement extends ModificationStatement
                       Attributes.Raw attrs,
                       List<Operation.RawDeletion> deletions,
                       List<Relation> whereClause,
-                      List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions)
+                      List<Pair<ColumnIdentifier, ColumnCondition.Raw>> conditions)
         {
             super(name, attrs, conditions, false);
             this.deletions = deletions;
@@ -105,7 +118,7 @@ public class DeleteStatement extends ModificationStatement
 
         protected ModificationStatement prepareInternal(CFDefinition cfDef, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException
         {
-            DeleteStatement stmt = new DeleteStatement(cfDef.cfm, attrs);
+            DeleteStatement stmt = new DeleteStatement(ModificationStatement.StatementType.DELETE, cfDef.cfm, attrs);
 
             for (Operation.RawDeletion deletion : deletions)
             {
@@ -115,7 +128,7 @@ public class DeleteStatement extends ModificationStatement
 
                 // For compact, we only have one value except the key, so the only form of DELETE that make sense is without a column
                 // list. However, we support having the value name for coherence with the static/sparse case
-                if (name.kind != CFDefinition.Name.Kind.COLUMN_METADATA && name.kind != CFDefinition.Name.Kind.VALUE_ALIAS)
+                if (name.isPrimaryKeyColumn())
                     throw new InvalidRequestException(String.format("Invalid identifier %s for deletion (should not be a PRIMARY KEY part)", name));
 
                 Operation op = deletion.prepare(name);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 676286c..ac8d2e1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -20,12 +20,15 @@ package org.apache.cassandra.cql3.statements;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
 import org.github.jamm.MemoryMeter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnSlice;
@@ -56,6 +59,9 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
     private static boolean loggedCounterTTL = false;
     private static boolean loggedCounterTimestamp = false;
 
+    public static enum StatementType { INSERT, UPDATE, DELETE }
+    public final StatementType type;
+
     public final CFMetaData cfm;
     public final Attributes attrs;
 
@@ -63,11 +69,25 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
     private final List<Operation> columnOperations = new ArrayList<Operation>();
 
     private int boundTerms;
-    private List<Operation> columnConditions;
+    // Separating normal and static conditions makes things somewhat easier
+    private List<ColumnCondition> columnConditions;
+    private List<ColumnCondition> staticConditions;
     private boolean ifNotExists;
 
-    public ModificationStatement(CFMetaData cfm, Attributes attrs)
+    private boolean hasNoClusteringColumns = true;
+    private boolean setsOnlyStaticColumns;
+
+    private final Function<ColumnCondition, ColumnIdentifier> getColumnForCondition = new Function<ColumnCondition, ColumnIdentifier>()
     {
+        public ColumnIdentifier apply(ColumnCondition cond)
+        {
+            return cond.column.name;
+        }
+    };
+
+    public ModificationStatement(StatementType type, CFMetaData cfm, Attributes attrs)
+    {
+        this.type = type;
         this.cfm = cfm;
         this.attrs = attrs;
     }
@@ -78,11 +98,12 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
              + meter.measureDeep(attrs)
              + meter.measureDeep(processedKeys)
              + meter.measureDeep(columnOperations)
-             + (columnConditions == null ? 0 : meter.measureDeep(columnConditions));
+             + (columnConditions == null ? 0 : meter.measureDeep(columnConditions))
+             + (staticConditions == null ? 0 : meter.measureDeep(staticConditions));
     }
 
     public abstract boolean requireFullClusteringKey();
-    public abstract ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params) throws InvalidRequestException;
+    public abstract void addUpdateForKey(ColumnFamily updates, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params) throws InvalidRequestException;
 
     public int getBoundTerms()
     {
@@ -155,6 +176,15 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
 
     public void addOperation(Operation op)
     {
+        if (op.isStatic(cfm))
+        {
+            if (columnOperations.isEmpty())
+                setsOnlyStaticColumns = true;
+        }
+        else
+        {
+            setsOnlyStaticColumns = false;
+        }
         columnOperations.add(op);
     }
 
@@ -163,12 +193,31 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return columnOperations;
     }
 
-    public void addCondition(Operation op)
+    public Iterable<ColumnIdentifier> getColumnsWithConditions()
     {
-        if (columnConditions == null)
-            columnConditions = new ArrayList<Operation>();
+        if (ifNotExists)
+            return null;
+
+        return Iterables.concat(columnConditions == null ? Collections.<ColumnIdentifier>emptyList() : Iterables.transform(columnConditions, getColumnForCondition),
+                                staticConditions == null ? Collections.<ColumnIdentifier>emptyList() : Iterables.transform(staticConditions, getColumnForCondition));
+    }
 
-        columnConditions.add(op);
+    public void addCondition(ColumnCondition cond) throws InvalidRequestException
+    {
+        List<ColumnCondition> conds = null;
+        if (cond.column.kind == CFDefinition.Name.Kind.STATIC)
+        {
+            if (staticConditions == null)
+                staticConditions = new ArrayList<ColumnCondition>();
+            conds = staticConditions;
+        }
+        else
+        {
+            if (columnConditions == null)
+                columnConditions = new ArrayList<ColumnCondition>();
+            conds = columnConditions;
+        }
+        conds.add(cond);
     }
 
     public void setIfNotExistCondition()
@@ -176,13 +225,20 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         ifNotExists = true;
     }
 
-    private void addKeyValues(ColumnIdentifier name, Restriction values) throws InvalidRequestException
+    public boolean hasIfNotExistCondition()
     {
-        if (processedKeys.put(name, values) != null)
-            throw new InvalidRequestException(String.format("Multiple definitions found for PRIMARY KEY part %s", name));
+        return ifNotExists;
     }
 
-    public void addKeyValue(ColumnIdentifier name, Term value) throws InvalidRequestException
+    private void addKeyValues(CFDefinition.Name name, Restriction values) throws InvalidRequestException
+    {
+        if (name.kind == CFDefinition.Name.Kind.COLUMN_ALIAS)
+            hasNoClusteringColumns = false;
+        if (processedKeys.put(name.name, values) != null)
+            throw new InvalidRequestException(String.format("Multiple definitions found for PRIMARY KEY part %s", name.name));
+    }
+
+    public void addKeyValue(CFDefinition.Name name, Term value) throws InvalidRequestException
     {
         addKeyValues(name, new Restriction.EQ(value, false));
     }
@@ -233,10 +289,11 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
                         throw new InvalidRequestException(String.format("Invalid operator %s for PRIMARY KEY part %s", rel.operator(), name));
                     }
 
-                    addKeyValues(name.name, restriction);
+                    addKeyValues(name, restriction);
                     break;
                 case VALUE_ALIAS:
                 case COLUMN_METADATA:
+                case STATIC:
                     throw new InvalidRequestException(String.format("Non PRIMARY KEY %s found in where clause", name));
             }
         }
@@ -248,7 +305,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         CFDefinition cfDef = cfm.getCfDef();
         ColumnNameBuilder keyBuilder = cfDef.getKeyNameBuilder();
         List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
-        for (CFDefinition.Name name : cfDef.keys.values())
+        for (CFDefinition.Name name : cfDef.partitionKeys())
         {
             Restriction r = processedKeys.get(name.name);
             if (r == null)
@@ -262,7 +319,9 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
                 {
                     if (val == null)
                         throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", name));
-                    keys.add(keyBuilder.copy().add(val).build());
+                    ByteBuffer key = keyBuilder.copy().add(val).build();
+                    ThriftValidation.validateKey(cfm, key);
+                    keys.add(key);
                 }
             }
             else
@@ -281,10 +340,46 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
     public ColumnNameBuilder createClusteringPrefixBuilder(List<ByteBuffer> variables)
     throws InvalidRequestException
     {
+        // If the only updated/deleted columns are static, then we don't need clustering columns.
+        // And in fact, unless it is an INSERT, we reject if clustering colums are provided as that
+        // suggest something unintended. For instance, given:
+        //   CREATE TABLE t (k int, v int, s int static, PRIMARY KEY (k, v))
+        // it can make sense to do:
+        //   INSERT INTO t(k, v, s) VALUES (0, 1, 2)
+        // but both
+        //   UPDATE t SET s = 3 WHERE k = 0 AND v = 1
+        //   DELETE v FROM t WHERE k = 0 AND v = 1
+        // sounds like you don't really understand what your are doing.
+        if (setsOnlyStaticColumns && columnConditions == null && (type != StatementType.INSERT || hasNoClusteringColumns))
+        {
+            // Reject if any clustering columns is set
+            for (CFDefinition.Name name : cfm.getCfDef().clusteringColumns())
+                if (processedKeys.get(name.name) != null)
+                    throw new InvalidRequestException(String.format("Invalid restriction on clustering column %s since the %s statement modifies only static columns", name.name, type));
+            return cfm.getStaticColumnNameBuilder();
+        }
+
+        return createClusteringPrefixBuilderInternal(variables);
+    }
+
+    private ColumnNameBuilder updatePrefixFor(ByteBuffer name, ColumnNameBuilder prefix)
+    {
+        return isStatic(name) ? cfm.getStaticColumnNameBuilder() : prefix;
+    }
+
+    public boolean isStatic(ByteBuffer name)
+    {
+        ColumnDefinition def = cfm.getColumnDefinition(name);
+        return def != null && def.type == ColumnDefinition.Type.STATIC;
+    }
+
+    private ColumnNameBuilder createClusteringPrefixBuilderInternal(List<ByteBuffer> variables)
+    throws InvalidRequestException
+    {
         CFDefinition cfDef = cfm.getCfDef();
         ColumnNameBuilder builder = cfDef.getColumnNameBuilder();
         CFDefinition.Name firstEmptyKey = null;
-        for (CFDefinition.Name name : cfDef.columns.values())
+        for (CFDefinition.Name name : cfDef.clusteringColumns())
         {
             Restriction r = processedKeys.get(name.name);
             if (r == null)
@@ -312,7 +407,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
 
     protected CFDefinition.Name getFirstEmptyKey()
     {
-        for (CFDefinition.Name name : cfm.getCfDef().columns.values())
+        for (CFDefinition.Name name : cfm.getCfDef().clusteringColumns())
         {
             if (processedKeys.get(name.name) == null)
                 return name;
@@ -354,8 +449,9 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         int i = 0;
         for (ByteBuffer name : toRead)
         {
-            ByteBuffer start = clusteringPrefix.copy().add(name).build();
-            ByteBuffer finish = clusteringPrefix.copy().add(name).buildAsEndOfRange();
+            ColumnNameBuilder prefix = updatePrefixFor(name, clusteringPrefix);
+            ByteBuffer start = prefix.copy().add(name).build();
+            ByteBuffer finish = prefix.copy().add(name).buildAsEndOfRange();
             slices[i++] = new ColumnSlice(start, finish);
         }
 
@@ -392,7 +488,9 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
 
     public boolean hasConditions()
     {
-        return ifNotExists || (columnConditions != null && !columnConditions.isEmpty());
+        return ifNotExists
+            || (columnConditions != null && !columnConditions.isEmpty())
+            || (staticConditions != null && !staticConditions.isEmpty());
     }
 
     public ResultMessage execute(QueryState queryState, QueryOptions options)
@@ -434,20 +532,14 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         if (keys.size() > 1)
             throw new InvalidRequestException("IN on the partition key is not supported with conditional updates");
 
-        ColumnNameBuilder clusteringPrefix = createClusteringPrefixBuilder(variables);
-
         ByteBuffer key = keys.get(0);
-        ThriftValidation.validateKey(cfm, key);
-
-        UpdateParameters updParams = new UpdateParameters(cfm, variables, queryState.getTimestamp(), getTimeToLive(variables), null);
-        ColumnFamily updates = updateForKey(key, clusteringPrefix, updParams);
 
         // It's cleaner to use the query timestamp below, but it's in seconds while the conditions expects microseconds, so just
         // put it back in millis (we don't really lose precision because the ultimate consumer, Column.isLive, re-divide it).
-        long now = queryState.getTimestamp() * 1000;
-        CASConditions conditions = ifNotExists
-                                 ? new NotExistCondition(clusteringPrefix, now)
-                                 : new ColumnsConditions(clusteringPrefix, cfm, key, columnConditions, variables, now);
+        CQL3CasConditions conditions = new CQL3CasConditions(cfm, queryState.getTimestamp() * 1000);
+        ColumnNameBuilder prefix = createClusteringPrefixBuilder(variables);
+        ColumnFamily updates = UnsortedColumns.factory.create(cfm);
+        addUpdatesAndConditions(key, prefix, updates, conditions, variables, getTimestamp(queryState.getTimestamp(), variables));
 
         ColumnFamily result = StorageProxy.cas(keyspace(),
                                                columnFamily(),
@@ -459,16 +551,44 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return new ResultMessage.Rows(buildCasResultSet(key, result));
     }
 
+    public void addUpdatesAndConditions(ByteBuffer key, ColumnNameBuilder clusteringPrefix, ColumnFamily updates, CQL3CasConditions conditions, List<ByteBuffer> variables, long now)
+    throws InvalidRequestException
+    {
+        UpdateParameters updParams = new UpdateParameters(cfm, variables, now, getTimeToLive(variables), null);
+        addUpdateForKey(updates, key, clusteringPrefix, updParams);
+
+        if (ifNotExists)
+        {
+            // If we use ifNotExists, if the statement applies to any non static columns, then the condition is on the row of the non-static
+            // columns and the prefix should be the rowPrefix. But if only static columns are set, then the ifNotExists apply to the existence
+            // of any static columns and we should use the prefix for the "static part" of the partition.
+            conditions.addNotExist(setsOnlyStaticColumns ? cfm.getStaticColumnNameBuilder() : clusteringPrefix);
+        }
+        else
+        {
+            if (columnConditions != null)
+                conditions.addConditions(clusteringPrefix, columnConditions, variables);
+            if (staticConditions != null)
+                conditions.addConditions(cfm.getStaticColumnNameBuilder(), staticConditions, variables);
+        }
+    }
+
     private ResultSet buildCasResultSet(ByteBuffer key, ColumnFamily cf) throws InvalidRequestException
     {
+        return buildCasResultSet(keyspace(), key, columnFamily(), cf, getColumnsWithConditions(), false);
+    }
+
+    public static ResultSet buildCasResultSet(String ksName, ByteBuffer key, String cfName, ColumnFamily cf, Iterable<ColumnIdentifier> columnsWithConditions, boolean isBatch)
+    throws InvalidRequestException
+    {
         boolean success = cf == null;
 
-        ColumnSpecification spec = new ColumnSpecification(keyspace(), columnFamily(), CAS_RESULT_COLUMN, BooleanType.instance);
+        ColumnSpecification spec = new ColumnSpecification(ksName, cfName, CAS_RESULT_COLUMN, BooleanType.instance);
         ResultSet.Metadata metadata = new ResultSet.Metadata(Collections.singletonList(spec));
         List<List<ByteBuffer>> rows = Collections.singletonList(Collections.singletonList(BooleanType.instance.decompose(success)));
 
         ResultSet rs = new ResultSet(metadata, rows);
-        return success ? rs : merge(rs, buildCasFailureResultSet(key, cf));
+        return success ? rs : merge(rs, buildCasFailureResultSet(key, cf, columnsWithConditions, isBatch));
     }
 
     private static ResultSet merge(ResultSet left, ResultSet right)
@@ -478,31 +598,44 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         else if (right.size() == 0)
             return left;
 
-        assert left.size() == 1 && right.size() == 1;
+        assert left.size() == 1;
         int size = left.metadata.names.size() + right.metadata.names.size();
         List<ColumnSpecification> specs = new ArrayList<ColumnSpecification>(size);
         specs.addAll(left.metadata.names);
         specs.addAll(right.metadata.names);
-        List<ByteBuffer> row = new ArrayList<ByteBuffer>(size);
-        row.addAll(left.rows.get(0));
-        row.addAll(right.rows.get(0));
-        return new ResultSet(new ResultSet.Metadata(specs), Collections.singletonList(row));
+        List<List<ByteBuffer>> rows = new ArrayList<>(right.size());
+        for (int i = 0; i < right.size(); i++)
+        {
+            List<ByteBuffer> row = new ArrayList<ByteBuffer>(size);
+            row.addAll(left.rows.get(0));
+            row.addAll(right.rows.get(i));
+            rows.add(row);
+        }
+        return new ResultSet(new ResultSet.Metadata(specs), rows);
     }
 
-    private ResultSet buildCasFailureResultSet(ByteBuffer key, ColumnFamily cf) throws InvalidRequestException
+    private static ResultSet buildCasFailureResultSet(ByteBuffer key, ColumnFamily cf, Iterable<ColumnIdentifier> columnsWithConditions, boolean isBatch)
+    throws InvalidRequestException
     {
-        CFDefinition cfDef = cfm.getCfDef();
+        CFDefinition cfDef = cf.metadata().getCfDef();
 
         Selection selection;
-        if (ifNotExists)
+        if (columnsWithConditions == null)
         {
             selection = Selection.wildcard(cfDef);
         }
         else
         {
-            List<CFDefinition.Name> names = new ArrayList<CFDefinition.Name>(columnConditions.size());
-            for (Operation condition : columnConditions)
-                names.add(cfDef.get(condition.columnName));
+            List<CFDefinition.Name> names = new ArrayList<CFDefinition.Name>();
+            // Adding the partition key for batches to disambiguate if the conditions span multipe rows (we don't add them outside
+            // of batches for compatibility sakes).
+            if (isBatch)
+            {
+                names.addAll(cfDef.partitionKeys());
+                names.addAll(cfDef.clusteringColumns());
+            }
+            for (ColumnIdentifier id : columnsWithConditions)
+                names.add(cfDef.get(id));
             selection = Selection.forColumns(names);
         }
 
@@ -548,7 +681,8 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         for (ByteBuffer key: keys)
         {
             ThriftValidation.validateKey(cfm, key);
-            ColumnFamily cf = updateForKey(key, clusteringPrefix, params);
+            ColumnFamily cf = UnsortedColumns.factory.create(cfm);
+            addUpdateForKey(cf, key, clusteringPrefix, params);
             mutations.add(makeMutation(key, cf, cl, isBatch));
         }
         return mutations;
@@ -570,104 +704,17 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return isCounter() ? new CounterMutation(rm, cl) : rm;
     }
 
-    private static abstract class CQL3CasConditions implements CASConditions
-    {
-        protected final ColumnNameBuilder rowPrefix;
-        protected final long now;
-
-        protected CQL3CasConditions(ColumnNameBuilder rowPrefix, long now)
-        {
-            this.rowPrefix = rowPrefix;
-            this.now = now;
-        }
-
-        public IDiskAtomFilter readFilter()
-        {
-            // We always read the row entirely as on CAS failure we want to be able to distinguish between "row exists
-            // but all values on why there were conditions are null" and "row doesn't exists", and we can't rely on the
-            // row marker for that (see #6623)
-            return new SliceQueryFilter(rowPrefix.build(), rowPrefix.buildAsEndOfRange(), false, 1, rowPrefix.componentCount());
-        }
-    }
-
-    private static class NotExistCondition extends CQL3CasConditions
-    {
-        private NotExistCondition(ColumnNameBuilder rowPrefix, long now)
-        {
-            super(rowPrefix, now);
-        }
-
-        public boolean appliesTo(ColumnFamily current)
-        {
-            return current == null || current.hasOnlyTombstones(now);
-        }
-    }
-
-    private static class ColumnsConditions extends CQL3CasConditions
-    {
-        private final ColumnFamily expected;
-
-        private ColumnsConditions(ColumnNameBuilder rowPrefix,
-                                  CFMetaData cfm,
-                                  ByteBuffer key,
-                                  Collection<Operation> conditions,
-                                  List<ByteBuffer> variables,
-                                  long now) throws InvalidRequestException
-        {
-            super(rowPrefix, now);
-            this.expected = TreeMapBackedSortedColumns.factory.create(cfm);
-
-            // When building the conditions, we should not use a TTL. It's not useful, and if a very low ttl (1 seconds) is used, it's possible
-            // for it to expire before the actual build of the conditions which would break since we would then testing for the presence of tombstones.
-            UpdateParameters params = new UpdateParameters(cfm, variables, now, 0, null);
-
-            // Conditions
-            for (Operation condition : conditions)
-                condition.execute(key, expected, rowPrefix.copy(), params);
-        }
-
-        public boolean appliesTo(ColumnFamily current)
-        {
-            if (current == null)
-                return false;
-
-            for (Column e : expected)
-            {
-                Column c = current.getColumn(e.name());
-                if (e.isLive(now))
-                {
-                    if (c == null || !c.isLive(now) || !c.value().equals(e.value()))
-                        return false;
-                }
-                else
-                {
-                    // If we have a tombstone in expected, it means the condition tests that the column is
-                    // null, so check that we have no value
-                    if (c != null && c.isLive(now))
-                        return false;
-                }
-            }
-            return true;
-        }
-
-        @Override
-        public String toString()
-        {
-            return expected.toString();
-        }
-    }
-
     public static abstract class Parsed extends CFStatement
     {
         protected final Attributes.Raw attrs;
-        private final List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions;
+        private final List<Pair<ColumnIdentifier, ColumnCondition.Raw>> conditions;
         private final boolean ifNotExists;
 
-        protected Parsed(CFName name, Attributes.Raw attrs, List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions, boolean ifNotExists)
+        protected Parsed(CFName name, Attributes.Raw attrs, List<Pair<ColumnIdentifier, ColumnCondition.Raw>> conditions, boolean ifNotExists)
         {
             super(name);
             this.attrs = attrs;
-            this.conditions = conditions == null ? Collections.<Pair<ColumnIdentifier, Operation.RawUpdate>>emptyList() : conditions;
+            this.conditions = conditions == null ? Collections.<Pair<ColumnIdentifier, ColumnCondition.Raw>>emptyList() : conditions;
             this.ifNotExists = ifNotExists;
         }
 
@@ -709,24 +756,13 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
                 }
                 else
                 {
-                    for (Pair<ColumnIdentifier, Operation.RawUpdate> entry : conditions)
+                    for (Pair<ColumnIdentifier, ColumnCondition.Raw> entry : conditions)
                     {
                         CFDefinition.Name name = cfDef.get(entry.left);
                         if (name == null)
                             throw new InvalidRequestException(String.format("Unknown identifier %s", entry.left));
 
-                        /*
-                         * Lists column names are based on a server-side generated timeuuid. So we can't allow lists
-                         * operation or that would yield unexpected results (update that should apply wouldn't). So for
-                         * now, we just refuse lists, which also save use from having to bother about the read that some
-                         * list operation involve.
-                         */
-                        if (name.type instanceof ListType)
-                            throw new InvalidRequestException(String.format("List operation (%s) are not allowed in conditional updates", name));
-
-                        Operation condition = entry.right.prepare(name);
-                        assert !condition.requiresRead();
-
+                        ColumnCondition condition = entry.right.prepare(name);
                         condition.collectMarkerSpecification(boundNames);
 
                         switch (name.kind)
@@ -736,6 +772,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
                                 throw new InvalidRequestException(String.format("PRIMARY KEY part %s found in SET part", entry.left));
                             case VALUE_ALIAS:
                             case COLUMN_METADATA:
+                            case STATIC:
                                 stmt.addCondition(condition);
                                 break;
                         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 52a7c70..2636c83 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -83,18 +83,57 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
 
     private Map<CFDefinition.Name, Integer> orderingIndexes;
 
+    private boolean selectsStaticColumns;
+    private boolean selectsOnlyStaticColumns;
+
     // Used by forSelection below
     private static final Parameters defaultParameters = new Parameters(Collections.<ColumnIdentifier, Boolean>emptyMap(), false, false, null, false);
 
+    private static final Predicate<CFDefinition.Name> isStaticFilter = new Predicate<CFDefinition.Name>()
+    {
+        public boolean apply(CFDefinition.Name name)
+        {
+            return name.kind == CFDefinition.Name.Kind.STATIC;
+        }
+    };
+
     public SelectStatement(CFDefinition cfDef, int boundTerms, Parameters parameters, Selection selection, Term limit)
     {
         this.cfDef = cfDef;
         this.boundTerms = boundTerms;
         this.selection = selection;
-        this.keyRestrictions = new Restriction[cfDef.keys.size()];
-        this.columnRestrictions = new Restriction[cfDef.columns.size()];
+        this.keyRestrictions = new Restriction[cfDef.partitionKeyCount()];
+        this.columnRestrictions = new Restriction[cfDef.clusteringColumnsCount()];
         this.parameters = parameters;
         this.limit = limit;
+
+        // Now gather a few info on whether we should bother with static columns or not for this statement
+        initStaticColumnsInfo();
+    }
+
+    private void initStaticColumnsInfo()
+    {
+        if (!cfDef.cfm.hasStaticColumns())
+            return;
+
+        // If it's a wildcard, we do select static but not only them
+        if (selection.isWildcard())
+        {
+            selectsStaticColumns = true;
+            return;
+        }
+
+        // Otherwise, check the selected columns
+        selectsStaticColumns = !Iterables.isEmpty(Iterables.filter(selection.getColumnsList(), isStaticFilter));
+        selectsOnlyStaticColumns = true;
+        for (CFDefinition.Name name : selection.getColumnsList())
+        {
+            if (name.kind != CFDefinition.Name.Kind.KEY_ALIAS && name.kind != CFDefinition.Name.Kind.STATIC)
+            {
+                selectsOnlyStaticColumns = false;
+                break;
+            }
+        }
     }
 
     // Creates a simple select based on the given selection.
@@ -383,35 +422,91 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             // to account for the grouping of columns.
             // Since that doesn't work for maps/sets/lists, we now use the compositesToGroup option of SliceQueryFilter.
             // But we must preserve backward compatibility too (for mixed version cluster that is).
-            int toGroup = cfDef.isCompact ? -1 : cfDef.columns.size();
+            int toGroup = cfDef.isCompact ? -1 : cfDef.clusteringColumnsCount();
             List<ByteBuffer> startBounds = getRequestedBound(Bound.START, variables);
             List<ByteBuffer> endBounds = getRequestedBound(Bound.END, variables);
             assert startBounds.size() == endBounds.size();
 
+            // Handles fetching static columns. Note that for 2i, the filter is just used to restrict
+            // the part of the index to query so adding the static slice would be useless and confusing.
+            // For 2i, static columns are retrieve in CompositesSearcher with each index hit.
+            ColumnSlice staticSlice = null;
+            if (selectsStaticColumns && !usesSecondaryIndexing)
+            {
+                ColumnNameBuilder staticPrefix = cfDef.cfm.getStaticColumnNameBuilder();
+                // Note: we could use staticPrefix.build() for the start bound, but EMPTY_BYTE_BUFFER gives us the
+                // same effect while saving a few CPU cycles.
+                staticSlice = isReversed
+                            ? new ColumnSlice(staticPrefix.buildAsEndOfRange(), ByteBufferUtil.EMPTY_BYTE_BUFFER)
+                            : new ColumnSlice(ByteBufferUtil.EMPTY_BYTE_BUFFER, staticPrefix.buildAsEndOfRange());
+
+                // In the case where we only select static columns, we want to really only check the static columns.
+                // So we return early as the rest of that method would actually make us query everything
+                if (selectsOnlyStaticColumns)
+                    return sliceFilter(staticSlice, limit, toGroup);
+            }
+
             // The case where startBounds == 1 is common enough that it's worth optimizing
-            ColumnSlice[] slices;
             if (startBounds.size() == 1)
             {
                 ColumnSlice slice = new ColumnSlice(startBounds.get(0), endBounds.get(0));
                 if (slice.isAlwaysEmpty(cfDef.cfm.comparator, isReversed))
-                    return null;
-                slices = new ColumnSlice[]{slice};
+                    return staticSlice == null ? null : sliceFilter(staticSlice, limit, toGroup);
+
+                return staticSlice == null
+                     ? sliceFilter(slice, limit, toGroup)
+                     : (slice.includes(cfDef.cfm.comparator, staticSlice.finish) ? sliceFilter(new ColumnSlice(staticSlice.start, slice.finish), limit, toGroup)
+                                                                                 : sliceFilter(new ColumnSlice[]{ staticSlice, slice }, limit, toGroup));
+            }
+
+            List<ColumnSlice> l = new ArrayList<ColumnSlice>(startBounds.size());
+            for (int i = 0; i < startBounds.size(); i++)
+            {
+                ColumnSlice slice = new ColumnSlice(startBounds.get(i), endBounds.get(i));
+                if (!slice.isAlwaysEmpty(cfDef.cfm.comparator, isReversed))
+                    l.add(slice);
+            }
+
+            if (l.isEmpty())
+                return staticSlice == null ? null : sliceFilter(staticSlice, limit, toGroup);
+            if (staticSlice == null)
+                return sliceFilter(l.toArray(new ColumnSlice[l.size()]), limit, toGroup);
+
+            // The slices should not overlap. We know the slices built from startBounds/endBounds don't, but
+            // if there is a static slice, it could overlap with the 2nd slice. Check for it and correct if
+            // that's the case
+            ColumnSlice[] slices;
+            if (isReversed)
+            {
+                if (l.get(l.size() - 1).includes(cfDef.cfm.comparator, staticSlice.start))
+                {
+                    slices = l.toArray(new ColumnSlice[l.size()]);
+                    slices[slices.length-1] = new ColumnSlice(slices[slices.length-1].start, ByteBufferUtil.EMPTY_BYTE_BUFFER);
+                }
+                else
+                {
+                    slices = l.toArray(new ColumnSlice[l.size()+1]);
+                    slices[slices.length-1] = staticSlice;
+                }
             }
             else
             {
-                List<ColumnSlice> l = new ArrayList<ColumnSlice>(startBounds.size());
-                for (int i = 0; i < startBounds.size(); i++)
+                if (l.get(0).includes(cfDef.cfm.comparator, staticSlice.finish))
                 {
-                    ColumnSlice slice = new ColumnSlice(startBounds.get(i), endBounds.get(i));
-                    if (!slice.isAlwaysEmpty(cfDef.cfm.comparator, isReversed))
-                        l.add(slice);
+                    slices = new ColumnSlice[l.size()];
+                    slices[0] = new ColumnSlice(ByteBufferUtil.EMPTY_BYTE_BUFFER, l.get(0).finish);
+                    for (int i = 1; i < l.size(); i++)
+                        slices[i] = l.get(i);
+                }
+                else
+                {
+                    slices = new ColumnSlice[l.size()+1];
+                    slices[0] = staticSlice;
+                    for (int i = 0; i < l.size(); i++)
+                        slices[i] = l.get(i);
                 }
-                if (l.isEmpty())
-                    return null;
-                slices = l.toArray(new ColumnSlice[l.size()]);
             }
-
-            return new SliceQueryFilter(slices, isReversed, limit, toGroup);
+            return sliceFilter(slices, limit, toGroup);
         }
         else
         {
@@ -423,6 +518,16 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         }
     }
 
+    private SliceQueryFilter sliceFilter(ColumnSlice slice, int limit, int toGroup)
+    {
+        return sliceFilter(new ColumnSlice[]{ slice }, limit, toGroup);
+    }
+
+    private SliceQueryFilter sliceFilter(ColumnSlice[] slices, int limit, int toGroup)
+    {
+        return new SliceQueryFilter(slices, isReversed, limit, toGroup);
+    }
+
     private int getLimit(List<ByteBuffer> variables) throws InvalidRequestException
     {
         int l = Integer.MAX_VALUE;
@@ -458,7 +563,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
     {
         List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
         ColumnNameBuilder builder = cfDef.getKeyNameBuilder();
-        for (CFDefinition.Name name : cfDef.keys.values())
+        for (CFDefinition.Name name : cfDef.partitionKeys())
         {
             Restriction r = keyRestrictions[name.position];
             assert r != null && !r.isSlice();
@@ -497,7 +602,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 return ByteBufferUtil.EMPTY_BYTE_BUFFER;
 
         // We deal with IN queries for keys in other places, so we know buildBound will return only one result
-        return buildBound(b, cfDef.keys.values(), keyRestrictions, false, cfDef.getKeyNameBuilder(), variables).get(0);
+        return buildBound(b, cfDef.partitionKeys(), keyRestrictions, false, cfDef.getKeyNameBuilder(), variables).get(0);
     }
 
     private Token getTokenBound(Bound b, List<ByteBuffer> variables, IPartitioner<?> p) throws InvalidRequestException
@@ -556,13 +661,15 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
 
     private SortedSet<ByteBuffer> getRequestedColumns(List<ByteBuffer> variables) throws InvalidRequestException
     {
+        // Note: getRequestedColumns don't handle static columns, but due to CASSANDRA-5762
+        // we always do a slice for CQL3 tables, so it's ok to ignore them here
         assert !isColumnRange();
 
         ColumnNameBuilder builder = cfDef.getColumnNameBuilder();
-        Iterator<ColumnIdentifier> idIter = cfDef.columns.keySet().iterator();
+        Iterator<CFDefinition.Name> idIter = cfDef.clusteringColumns().iterator();
         for (Restriction r : columnRestrictions)
         {
-            ColumnIdentifier id = idIter.next();
+            ColumnIdentifier id = idIter.next().name;
             assert r != null && !r.isSlice();
 
             List<ByteBuffer> values = r.values(variables);
@@ -625,15 +732,16 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 columns.add(builder.copy().add(ByteBufferUtil.EMPTY_BYTE_BUFFER).build());
 
                 // selected columns
-                for (ColumnIdentifier id : selection.regularColumnsToFetch())
+                for (ColumnIdentifier id : selection.regularAndStaticColumnsToFetch())
                     columns.add(builder.copy().add(id.key).build());
             }
             else
             {
-                Iterator<ColumnIdentifier> iter = cfDef.metadata.keySet().iterator();
+                // We now that we're not composite so we can ignore static columns
+                Iterator<CFDefinition.Name> iter = cfDef.regularColumns().iterator();
                 while (iter.hasNext())
                 {
-                    ColumnIdentifier name = iter.next();
+                    ColumnIdentifier name = iter.next().name;
                     ColumnNameBuilder b = iter.hasNext() ? builder.copy() : builder;
                     ByteBuffer cname = b.add(name.key).build();
                     columns.add(cname);
@@ -760,7 +868,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
     private List<ByteBuffer> getRequestedBound(Bound b, List<ByteBuffer> variables) throws InvalidRequestException
     {
         assert isColumnRange();
-        return buildBound(b, cfDef.columns.values(), columnRestrictions, isReversed, cfDef.getColumnNameBuilder(), variables);
+        return buildBound(b, cfDef.clusteringColumns(), columnRestrictions, isReversed, cfDef.getColumnNameBuilder(), variables);
     }
 
     public List<IndexExpression> getIndexExpressions(List<ByteBuffer> variables) throws InvalidRequestException
@@ -781,6 +889,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                     restriction = columnRestrictions[name.position];
                     break;
                 case COLUMN_METADATA:
+                case STATIC:
                     restriction = metadataRestrictions.get(name);
                     break;
                 default:
@@ -956,6 +1065,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                             result.add(c);
                             break;
                         case COLUMN_METADATA:
+                        case STATIC:
                             // This should not happen for compact CF
                             throw new AssertionError();
                         default:
@@ -979,8 +1089,35 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 builder.add(c);
             }
 
+            Map<CFDefinition.Name, ByteBuffer> staticValues = Collections.emptyMap();
+            // Gather up static values first
+            if (!builder.isEmpty() && builder.firstGroup().isStatic)
+            {
+                staticValues = new HashMap<>();
+                ColumnGroupMap group = builder.firstGroup();
+                for (CFDefinition.Name name : Iterables.filter(selection.getColumnsList(), isStaticFilter))
+                    staticValues.put(name, name.type.isCollection() ? getCollectionValue(name, group) : getSimpleValue(name, group));
+                builder.discardFirst();
+
+                // If there was static columns but there is no actual row, then provided the select was a full
+                // partition selection (i.e. not a 2ndary index search and there was no condition on clustering columns)
+                // then we want to include the static columns in the result set.
+                if (!staticValues.isEmpty() && builder.isEmpty() && !usesSecondaryIndexing && hasNoClusteringColumnsRestriction())
+                {
+                    result.newRow();
+                    for (CFDefinition.Name name : selection.getColumnsList())
+                    {
+                        if (name.kind == CFDefinition.Name.Kind.KEY_ALIAS)
+                            result.add(keyComponents[name.position]);
+                        else
+                            result.add(name.kind == CFDefinition.Name.Kind.STATIC ? staticValues.get(name) : null);
+                    }
+                    return;
+                }
+            }
+
             for (ColumnGroupMap group : builder.groups())
-                handleGroup(selection, result, keyComponents, group);
+                handleGroup(selection, result, keyComponents, group, staticValues);
         }
         else
         {
@@ -999,6 +1136,14 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         }
     }
 
+    private boolean hasNoClusteringColumnsRestriction()
+    {
+        for (int i = 0; i < columnRestrictions.length; i++)
+            if (columnRestrictions[i] != null)
+                return false;
+        return true;
+    }
+
     /**
      * Orders results when multiple keys are selected (using IN)
      */
@@ -1039,7 +1184,11 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         Collections.sort(cqlRows.rows, new CompositeComparator(types, positions));
     }
 
-    private void handleGroup(Selection selection, Selection.ResultSetBuilder result, ByteBuffer[] keyComponents, ColumnGroupMap columns) throws InvalidRequestException
+    private void handleGroup(Selection selection,
+                             Selection.ResultSetBuilder result,
+                             ByteBuffer[] keyComponents,
+                             ColumnGroupMap columns,
+                             Map<CFDefinition.Name, ByteBuffer> staticValues) throws InvalidRequestException
     {
         // Respect requested order
         result.newRow();
@@ -1059,21 +1208,32 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 case COLUMN_METADATA:
                     if (name.type.isCollection())
                     {
-                        List<Pair<ByteBuffer, Column>> collection = columns.getCollection(name.name.key);
-                        ByteBuffer value = collection == null
-                                         ? null
-                                         : ((CollectionType)name.type).serialize(collection);
-                        result.add(value);
+                        result.add(getCollectionValue(name, columns));
                     }
                     else
                     {
                         result.add(columns.getSimple(name.name.key));
                     }
                     break;
+                case STATIC:
+                    result.add(staticValues.get(name));
+                    break;
             }
         }
     }
 
+    private static ByteBuffer getCollectionValue(CFDefinition.Name name, ColumnGroupMap columns)
+    {
+        List<Pair<ByteBuffer, Column>> collection = columns.getCollection(name.name.key);
+        return collection == null ? null : ((CollectionType)name.type).serialize(collection);
+    }
+
+    private static ByteBuffer getSimpleValue(CFDefinition.Name name, ColumnGroupMap columns)
+    {
+        Column c = columns.getSimple(name.name.key);
+        return c == null ? null : c.value();
+    }
+
     private static boolean isReversedType(CFDefinition.Name name)
     {
         return name.type instanceof ReversedType;
@@ -1089,6 +1249,14 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         return true;
     }
 
+    private boolean hasClusteringColumnsRestriction()
+    {
+        for (int i = 0; i < columnRestrictions.length; i++)
+            if (columnRestrictions[i] != null)
+                return true;
+        return false;
+    }
+
     public static class RawStatement extends CFStatement
     {
         private final Parameters parameters;
@@ -1122,7 +1290,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                                 : Selection.fromSelectors(cfDef, selectClause);
 
             if (parameters.isDistinct)
-                validateDistinctSelection(selection.getColumnsList(), cfDef.keys.values());
+                validateDistinctSelection(selection.getColumnsList(), cfDef.partitionKeys());
 
             Term prepLimit = null;
             if (limit != null)
@@ -1174,6 +1342,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                     case VALUE_ALIAS:
                         throw new InvalidRequestException(String.format("Predicates on the non-primary-key column (%s) of a COMPACT table are not yet supported", name.name));
                     case COLUMN_METADATA:
+                    case STATIC:
                         // We only all IN on the row key and last clustering key so far, never on non-PK columns, and this even if there's an index
                         Restriction r = updateRestriction(cfm, name, stmt.metadataRestrictions.get(name), rel, names);
                         if (r.isIN() && !((Restriction.IN)r).canHaveOnlyOneValue())
@@ -1197,7 +1366,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             boolean canRestrictFurtherComponents = true;
             CFDefinition.Name previous = null;
             stmt.keyIsInRelation = false;
-            Iterator<CFDefinition.Name> iter = cfDef.keys.values().iterator();
+            Iterator<CFDefinition.Name> iter = cfDef.partitionKeys().iterator();
             for (int i = 0; i < stmt.keyRestrictions.length; i++)
             {
                 CFDefinition.Name cname = iter.next();
@@ -1266,7 +1435,10 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             // All (or none) of the partition key columns have been specified;
             // hence there is no need to turn these restrictions into index expressions.
             if (!stmt.usesSecondaryIndexing)
-                stmt.restrictedNames.removeAll(cfDef.keys.values());
+                stmt.restrictedNames.removeAll(cfDef.partitionKeys());
+
+            if (stmt.selectsOnlyStaticColumns && stmt.hasClusteringColumnsRestriction())
+                throw new InvalidRequestException("Cannot restrict clustering columns when selecting only static columns");
 
             // If a clustering key column is restricted by a non-EQ relation, all preceding
             // columns must have a EQ, and all following must have no restriction. Unless
@@ -1274,7 +1446,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             canRestrictFurtherComponents = true;
             previous = null;
             boolean previousIsSlice = false;
-            iter = cfDef.columns.values().iterator();
+            iter = cfDef.clusteringColumns().iterator();
             for (int i = 0; i < stmt.columnRestrictions.length; i++)
             {
                 CFDefinition.Name cname = iter.next();
@@ -1332,7 +1504,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 stmt.usesSecondaryIndexing = true;
 
             if (!stmt.usesSecondaryIndexing)
-                stmt.restrictedNames.removeAll(cfDef.columns.values());
+                stmt.restrictedNames.removeAll(cfDef.clusteringColumns());
 
             // Even if usesSecondaryIndexing is false at this point, we'll still have to use one if
             // there is restrictions not covered by the PK.
@@ -1343,8 +1515,16 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 stmt.usesSecondaryIndexing = true;
             }
 
-            if (stmt.usesSecondaryIndexing && stmt.keyIsInRelation)
-                throw new InvalidRequestException("Select on indexed columns and with IN clause for the PRIMARY KEY are not supported");
+            if (stmt.usesSecondaryIndexing)
+            {
+                if (stmt.keyIsInRelation)
+                    throw new InvalidRequestException("Select on indexed columns and with IN clause for the PRIMARY KEY are not supported");
+                // When the user only select static columns, the intent is that we don't query the whole partition but just
+                // the static parts. But 1) we don't have an easy way to do that with 2i and 2) since we don't support index on static columns
+                // so far, 2i means that you've restricted a non static column, so the query is somewhat non-sensical.
+                if (stmt.selectsOnlyStaticColumns)
+                    throw new InvalidRequestException("Queries using 2ndary indexes don't support selecting only static columns");
+            }
 
             if (!stmt.parameters.orderings.isEmpty())
             {
@@ -1401,7 +1581,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                     }
                 }
 
-                Boolean[] reversedMap = new Boolean[cfDef.columns.size()];
+                Boolean[] reversedMap = new Boolean[cfDef.clusteringColumnsCount()];
                 int i = 0;
                 for (Map.Entry<ColumnIdentifier, Boolean> entry : stmt.parameters.orderings.entrySet())
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/Selection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Selection.java b/src/java/org/apache/cassandra/cql3/statements/Selection.java
index 600ee1b..9760311 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Selection.java
@@ -49,6 +49,12 @@ public abstract class Selection
         this.collectTTLs = collectTTLs;
     }
 
+    // Overriden by SimpleSelection when appropriate.
+    public boolean isWildcard()
+    {
+        return false;
+    }
+
     public ResultSet.Metadata getResultMetadata()
     {
         return new ResultSet.Metadata(metadata);
@@ -59,12 +65,12 @@ public abstract class Selection
         List<CFDefinition.Name> all = new ArrayList<CFDefinition.Name>();
         for (CFDefinition.Name name : cfDef)
             all.add(name);
-        return new SimpleSelection(all);
+        return new SimpleSelection(all, true);
     }
 
     public static Selection forColumns(List<CFDefinition.Name> columnsList)
     {
-        return new SimpleSelection(columnsList);
+        return new SimpleSelection(columnsList, false);
     }
 
     private static boolean isUsingFunction(List<RawSelector> rawSelectors)
@@ -105,7 +111,7 @@ public abstract class Selection
             CFDefinition.Name name = cfDef.get(tot.id);
             if (name == null)
                 throw new InvalidRequestException(String.format("Undefined name %s in selection clause", tot.id));
-            if (name.kind != CFDefinition.Name.Kind.COLUMN_METADATA && name.kind != CFDefinition.Name.Kind.VALUE_ALIAS)
+            if (name.isPrimaryKeyColumn())
                 throw new InvalidRequestException(String.format("Cannot use selection function %s on PRIMARY KEY part %s", tot.isWritetime ? "writeTime" : "ttl", name));
             if (name.type.isCollection())
                 throw new InvalidRequestException(String.format("Cannot use selection function %s on collections", tot.isWritetime ? "writeTime" : "ttl"));
@@ -195,7 +201,7 @@ public abstract class Selection
                 names.add(name);
                 metadata.add(rawSelector.alias == null ? name : makeAliasSpec(cfDef, name.type, rawSelector.alias));
             }
-            return new SimpleSelection(names, metadata);
+            return new SimpleSelection(names, metadata, false);
         }
     }
 
@@ -204,12 +210,12 @@ public abstract class Selection
     /**
      * @return the list of CQL3 "regular" (the "COLUMN_METADATA" ones) column names to fetch.
      */
-    public List<ColumnIdentifier> regularColumnsToFetch()
+    public List<ColumnIdentifier> regularAndStaticColumnsToFetch()
     {
         List<ColumnIdentifier> toFetch = new ArrayList<ColumnIdentifier>();
         for (CFDefinition.Name name : columnsList)
         {
-            if (name.kind == CFDefinition.Name.Kind.COLUMN_METADATA)
+            if (name.kind == CFDefinition.Name.Kind.COLUMN_METADATA || name.kind == CFDefinition.Name.Kind.STATIC)
                 toFetch.add(name.name);
         }
         return toFetch;
@@ -307,12 +313,14 @@ public abstract class Selection
     // Special cased selection for when no function is used (this save some allocations).
     private static class SimpleSelection extends Selection
     {
-        public SimpleSelection(List<CFDefinition.Name> columnsList)
+        private final boolean isWildcard;
+
+        public SimpleSelection(List<CFDefinition.Name> columnsList, boolean isWildcard)
         {
-            this(columnsList, new ArrayList<ColumnSpecification>(columnsList));
+            this(columnsList, new ArrayList<ColumnSpecification>(columnsList), isWildcard);
         }
 
-        public SimpleSelection(List<CFDefinition.Name> columnsList, List<ColumnSpecification> metadata)
+        public SimpleSelection(List<CFDefinition.Name> columnsList, List<ColumnSpecification> metadata, boolean isWildcard)
         {
             /*
              * In theory, even a simple selection could have multiple time the same column, so we
@@ -320,12 +328,19 @@ public abstract class Selection
              * get much duplicate in practice, it's more efficient not to bother.
              */
             super(columnsList, metadata, false, false);
+            this.isWildcard = isWildcard;
         }
 
         protected List<ByteBuffer> handleRow(ResultSetBuilder rs)
         {
             return rs.current;
         }
+
+        @Override
+        public boolean isWildcard()
+        {
+            return isWildcard;
+        }
     }
 
     private interface Selector extends AssignementTestable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index a387962..0e6481b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -35,9 +35,9 @@ public class UpdateStatement extends ModificationStatement
 {
     private static final Operation setToEmptyOperation = new Constants.Setter(null, new Constants.Value(ByteBufferUtil.EMPTY_BYTE_BUFFER));
 
-    private UpdateStatement(CFMetaData cfm, Attributes attrs)
+    private UpdateStatement(StatementType type, CFMetaData cfm, Attributes attrs)
     {
-        super(cfm, attrs);
+        super(type, cfm, attrs);
     }
 
     public boolean requireFullClusteringKey()
@@ -72,9 +72,9 @@ public class UpdateStatement extends ModificationStatement
         if (cfDef.isCompact)
         {
             if (builder.componentCount() == 0)
-                throw new InvalidRequestException(String.format("Missing PRIMARY KEY part %s", cfDef.columns.values().iterator().next()));
+                throw new InvalidRequestException(String.format("Missing PRIMARY KEY part %s", cfDef.clusteringColumns().iterator().next()));
 
-            if (cfDef.value == null)
+            if (cfDef.compactValue() == null)
             {
                 // compact + no compact value implies there is no column outside the PK. So no operation could
                 // have passed through validation
@@ -85,7 +85,7 @@ public class UpdateStatement extends ModificationStatement
             {
                 // compact means we don't have a row marker, so don't accept to set only the PK. See CASSANDRA-5648.
                 if (updates.isEmpty())
-                    throw new InvalidRequestException(String.format("Column %s is mandatory for this COMPACT STORAGE table", cfDef.value));
+                    throw new InvalidRequestException(String.format("Column %s is mandatory for this COMPACT STORAGE table", cfDef.compactValue()));
 
                 for (Operation update : updates)
                     update.execute(key, cf, builder.copy(), params);
@@ -131,7 +131,7 @@ public class UpdateStatement extends ModificationStatement
 
         protected ModificationStatement prepareInternal(CFDefinition cfDef, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException
         {
-            UpdateStatement stmt = new UpdateStatement(cfDef.cfm, attrs);
+            UpdateStatement stmt = new UpdateStatement(ModificationStatement.StatementType.INSERT, cfDef.cfm, attrs);
 
             // Created from an INSERT
             if (stmt.isCounter())
@@ -159,10 +159,11 @@ public class UpdateStatement extends ModificationStatement
                     case COLUMN_ALIAS:
                         Term t = value.prepare(name);
                         t.collectMarkerSpecification(boundNames);
-                        stmt.addKeyValue(name.name, t);
+                        stmt.addKeyValue(name, t);
                         break;
                     case VALUE_ALIAS:
                     case COLUMN_METADATA:
+                    case STATIC:
                         Operation operation = new Operation.SetValue(value).prepare(name);
                         operation.collectMarkerSpecification(boundNames);
                         stmt.addOperation(operation);
@@ -192,7 +193,7 @@ public class UpdateStatement extends ModificationStatement
                             Attributes.Raw attrs,
                             List<Pair<ColumnIdentifier, Operation.RawUpdate>> updates,
                             List<Relation> whereClause,
-                            List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions)
+                            List<Pair<ColumnIdentifier, ColumnCondition.Raw>> conditions)
         {
             super(name, attrs, conditions, false);
             this.updates = updates;
@@ -201,7 +202,7 @@ public class UpdateStatement extends ModificationStatement
 
         protected ModificationStatement prepareInternal(CFDefinition cfDef, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException
         {
-            UpdateStatement stmt = new UpdateStatement(cfDef.cfm, attrs);
+            UpdateStatement stmt = new UpdateStatement(ModificationStatement.StatementType.UPDATE, cfDef.cfm, attrs);
 
             for (Pair<ColumnIdentifier, Operation.RawUpdate> entry : updates)
             {
@@ -219,6 +220,7 @@ public class UpdateStatement extends ModificationStatement
                         throw new InvalidRequestException(String.format("PRIMARY KEY part %s found in SET part", entry.left));
                     case VALUE_ALIAS:
                     case COLUMN_METADATA:
+                    case STATIC:
                         stmt.addOperation(operation);
                         break;
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
index e0c576e..9eff12a 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
@@ -63,7 +63,6 @@ public class ColumnSlice
         return !finish.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER) && cmp.compare(finish, name) < 0;
     }
 
-
     @Override
     public final int hashCode()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index 90e7089..eb618f4 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -26,10 +26,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cql3.ColumnNameBuilder;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ExtendedFilter;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 import org.apache.cassandra.db.marshal.CompositeType;
@@ -238,11 +235,28 @@ public class CompositesSearcher extends SecondaryIndexSearcher
 
                         // We always query the whole CQL3 row. In the case where the original filter was a name filter this might be
                         // slightly wasteful, but this probably doesn't matter in practice and it simplify things.
-                        SliceQueryFilter dataFilter = new SliceQueryFilter(start,
-                                                                           entry.indexedEntryEnd(),
-                                                                           false,
-                                                                           Integer.MAX_VALUE,
-                                                                           baseCfs.metadata.clusteringKeyColumns().size());
+                        ColumnSlice dataSlice = new ColumnSlice(start, entry.indexedEntryEnd());
+                        ColumnSlice[] slices;
+                        if (baseCfs.metadata.hasStaticColumns())
+                        {
+                            // If the table has static columns, we must fetch them too as they may need to be returned too.
+                            // Note that this is potentially wasteful for 2 reasons:
+                            //  1) we will retrieve the static parts for each indexed row, even if we have more than one row in
+                            //     the same partition. If we were to group data queries to rows on the same slice, which would
+                            //     speed up things in general, we would also optimize here since we would fetch static columns only
+                            //     once for each group.
+                            //  2) at this point we don't know if the user asked for static columns or not, so we might be fetching
+                            //     them for nothing. We would however need to ship the list of "CQL3 columns selected" with getRangeSlice
+                            //     to be able to know that.
+                            // TODO: we should improve both point above
+                            ColumnSlice staticSlice = new ColumnSlice(ByteBufferUtil.EMPTY_BYTE_BUFFER, baseCfs.metadata.getStaticColumnNameBuilder().buildAsEndOfRange());
+                            slices = new ColumnSlice[]{ staticSlice, dataSlice };
+                        }
+                        else
+                        {
+                            slices = new ColumnSlice[]{ dataSlice };
+                        }
+                        SliceQueryFilter dataFilter = new SliceQueryFilter(slices, false, Integer.MAX_VALUE, baseCfs.metadata.clusteringKeyColumns().size());
                         ColumnFamily newData = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, dataFilter, filter.timestamp));
                         if (newData == null || index.isStale(entry, newData, filter.timestamp))
                         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
index d002aa7..9250b0f 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
@@ -34,6 +34,7 @@ import java.util.List;
  */
 public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
 {
+
     // changes bb position
     protected static int getShortLength(ByteBuffer bb)
     {
@@ -41,6 +42,13 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
         return length | (bb.get() & 0xFF);
     }
 
+    // Doesn't change bb position
+    protected static int getShortLength(ByteBuffer bb, int position)
+    {
+        int length = (bb.get(position) & 0xFF) << 8;
+        return length | (bb.get(position + 1) & 0xFF);
+    }
+
     // changes bb position
     protected static void putShortLength(ByteBuffer bb, int length)
     {
@@ -66,11 +74,17 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
 
     public int compare(ByteBuffer o1, ByteBuffer o2)
     {
-        if (o1 == null)
-            return o2 == null ? 0 : -1;
+        if (o1 == null || !o1.hasRemaining())
+            return o2 == null || !o2.hasRemaining() ? 0 : -1;
 
         ByteBuffer bb1 = o1.duplicate();
         ByteBuffer bb2 = o2.duplicate();
+
+        boolean isStatic1 = readIsStatic(bb1);
+        boolean isStatic2 = readIsStatic(bb2);
+        if (isStatic1 != isStatic2)
+            return isStatic1 ? -1 : 1;
+
         int i = 0;
 
         ByteBuffer previous = null;
@@ -90,22 +104,9 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
 
             byte b1 = bb1.get();
             byte b2 = bb2.get();
-            if (b1 < 0)
-            {
-                if (b2 >= 0)
-                    return -1;
-            }
-            else if (b1 > 0)
-            {
-                if (b2 <= 0)
-                    return 1;
-            }
-            else
-            {
-                // b1 == 0
-                if (b2 != 0)
-                    return -b2;
-            }
+            if (b1 != b2)
+                return b1 - b2;
+
             ++i;
         }
 
@@ -116,6 +117,10 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
         return 1;
     }
 
+    // Check if the provided BB represents a static name and advance the
+    // buffer to the real beginning if so.
+    protected abstract boolean readIsStatic(ByteBuffer bb);
+
     /**
      * Split a composite column names into it's components.
      */
@@ -123,6 +128,7 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
     {
         List<ByteBuffer> l = new ArrayList<ByteBuffer>();
         ByteBuffer bb = name.duplicate();
+        readIsStatic(bb);
         int i = 0;
         while (bb.remaining() > 0)
         {
@@ -150,6 +156,7 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
         List<CompositeComponent> list = new ArrayList<CompositeComponent>();
 
         ByteBuffer bb = bytes.duplicate();
+        readIsStatic(bb);
         int i = 0;
 
         while (bb.remaining() > 0)
@@ -219,8 +226,9 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
     {
         StringBuilder sb = new StringBuilder();
         ByteBuffer bb = bytes.duplicate();
-        int i = 0;
+        readIsStatic(bb);
 
+        int i = 0;
         while (bb.remaining() > 0)
         {
             if (bb.remaining() != bytes.remaining())
@@ -290,6 +298,7 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
     public void validate(ByteBuffer bytes) throws MarshalException
     {
         ByteBuffer bb = bytes.duplicate();
+        readIsStatic(bb);
 
         int i = 0;
         ByteBuffer previous = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index 2c0e121..83e3b97 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -42,8 +42,8 @@ import org.apache.cassandra.utils.ByteBufferUtil;
  *   <component><component><component> ...
  * where <component> is:
  *   <length of value><value><'end-of-component' byte>
- * where <length of value> is a 2 bytes unsigned short the and the
- * 'end-of-component' byte should always be 0 for actual column name.
+ * where <length of value> is a 2 bytes unsigned short (but 0xFFFF is invalid, see
+ * below) and the 'end-of-component' byte should always be 0 for actual column name.
  * However, it can set to 1 for query bounds. This allows to query for the
  * equivalent of 'give me the full super-column'. That is, if during a slice
  * query uses:
@@ -56,9 +56,16 @@ import org.apache.cassandra.utils.ByteBufferUtil;
  *   start = <3><"foo".getBytes()><-1>
  * allows to query everything that is greater than <3><"foo".getBytes()>, but
  * not <3><"foo".getBytes()> itself.
+ *
+ * On top of that, CQL3 uses a specific prefix (0xFFFF) to encode "static columns"
+ * (CASSANDRA-6561). This does mean the maximum size of the first component of a
+ * composite is 65534, not 65535 (or we wouldn't be able to detect if the first 2
+ * bytes is the static prefix or not).
  */
 public class CompositeType extends AbstractCompositeType
 {
+    public static final int STATIC_MARKER = 0xFFFF;
+
     public final List<AbstractType<?>> types;
 
     // interning instances
@@ -74,6 +81,24 @@ public class CompositeType extends AbstractCompositeType
         return getInstance(Arrays.<AbstractType<?>>asList(types));
     }
 
+    protected boolean readIsStatic(ByteBuffer bb)
+    {
+        return readStatic(bb);
+    }
+
+    private static boolean readStatic(ByteBuffer bb)
+    {
+        if (bb.remaining() < 2)
+            return false;
+
+        int header = getShortLength(bb, bb.position());
+        if ((header & 0xFFFF) != STATIC_MARKER)
+            return false;
+
+        getShortLength(bb); // Skip header
+        return true;
+    }
+
     public static synchronized CompositeType getInstance(List<AbstractType<?>> types)
     {
         assert types != null && !types.isEmpty();
@@ -149,6 +174,7 @@ public class CompositeType extends AbstractCompositeType
     public static ByteBuffer extractComponent(ByteBuffer bb, int idx)
     {
         bb = bb.duplicate();
+        readStatic(bb);
         int i = 0;
         while (bb.remaining() > 0)
         {
@@ -169,6 +195,11 @@ public class CompositeType extends AbstractCompositeType
         return extractComponent(bb, idx);
     }
 
+    public static boolean isStaticName(ByteBuffer bb)
+    {
+        return bb.remaining() >= 2 && (getShortLength(bb, bb.position()) & 0xFFFF) == STATIC_MARKER;
+    }
+
     @Override
     public int componentsCount()
     {
@@ -317,24 +348,33 @@ public class CompositeType extends AbstractCompositeType
         private final List<ByteBuffer> components;
         private final byte[] endOfComponents;
         private int serializedSize;
+        private final boolean isStatic;
 
         public Builder(CompositeType composite)
         {
-            this(composite, new ArrayList<ByteBuffer>(composite.types.size()), new byte[composite.types.size()]);
+            this(composite, new ArrayList<ByteBuffer>(composite.types.size()), new byte[composite.types.size()], false);
+        }
+
+        public static Builder staticBuilder(CompositeType composite)
+        {
+            return new Builder(composite, new ArrayList<ByteBuffer>(composite.types.size()), new byte[composite.types.size()], true);
         }
 
-        public Builder(CompositeType composite, List<ByteBuffer> components, byte[] endOfComponents)
+        private Builder(CompositeType composite, List<ByteBuffer> components, byte[] endOfComponents, boolean isStatic)
         {
             assert endOfComponents.length == composite.types.size();
 
             this.composite = composite;
             this.components = components;
             this.endOfComponents = endOfComponents;
+            this.isStatic = isStatic;
+            if (isStatic)
+                serializedSize = 2;
         }
 
         private Builder(Builder b)
         {
-            this(b.composite, new ArrayList<ByteBuffer>(b.components), Arrays.copyOf(b.endOfComponents, b.endOfComponents.length));
+            this(b.composite, new ArrayList<ByteBuffer>(b.components), Arrays.copyOf(b.endOfComponents, b.endOfComponents.length), b.isStatic);
             this.serializedSize = b.serializedSize;
         }
 
@@ -344,6 +384,7 @@ public class CompositeType extends AbstractCompositeType
                 throw new IllegalStateException("Composite column is already fully constructed");
 
             components.add(bb);
+            serializedSize += 3 + bb.remaining(); // 2 bytes lenght + 1 byte eoc
             return this;
         }
 
@@ -364,20 +405,23 @@ public class CompositeType extends AbstractCompositeType
 
         public ByteBuffer build()
         {
-            DataOutputBuffer out = new DataOutputBuffer(serializedSize);
-            for (int i = 0; i < components.size(); i++)
+            try
             {
-                try
+                DataOutputBuffer out = new DataOutputBuffer(serializedSize);
+                if (isStatic)
+                    out.writeShort(STATIC_MARKER);
+
+                for (int i = 0; i < components.size(); i++)
                 {
                     ByteBufferUtil.writeWithShortLength(components.get(i), out);
+                    out.write(endOfComponents[i]);
                 }
-                catch (IOException e)
-                {
-                    throw new RuntimeException(e);
-                }
-                out.write(endOfComponents[i]);
+                return ByteBuffer.wrap(out.getData(), 0, out.getLength());
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
             }
-            return ByteBuffer.wrap(out.getData(), 0, out.getLength());
         }
 
         public ByteBuffer buildAsEndOfRange()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java b/src/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java
index 35e6e33..9b56a82 100644
--- a/src/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java
@@ -75,6 +75,12 @@ public class DynamicCompositeType extends AbstractCompositeType
         this.aliases = aliases;
     }
 
+    protected boolean readIsStatic(ByteBuffer bb)
+    {
+        // We don't have the static nothing for DCT
+        return false;
+    }
+
     private AbstractType<?> getComparator(ByteBuffer bb)
     {
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 3fb1c5a..65e3be1 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.*;
 
+import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
@@ -674,11 +675,11 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
 
             // otherwise for CqlStorage, check metadata for classic thrift tables
             CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client);
-            for (ColumnIdentifier column : cfDefinition.metadata.keySet())
+            for (CFDefinition.Name column : Iterables.concat(cfDefinition.staticColumns(), cfDefinition.regularColumns()))
             {
                 ColumnDef cDef = new ColumnDef();
-                String columnName = column.toString();
-                String type = cfDefinition.metadata.get(column).type.toString();
+                String columnName = column.name.toString();
+                String type = column.type.toString();
                 logger.debug("name: {}, type: {} ", columnName, type);
                 cDef.name = ByteBufferUtil.bytes(columnName);
                 cDef.validation_class = type;
@@ -688,12 +689,12 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
             // could have already processed it as schema_columnfamilies.value_alias
             if (columnDefs.size() == 0 && includeCompactValueColumn)
             {
-                String value = cfDefinition.value != null ? cfDefinition.value.toString() : null;
+                String value = cfDefinition.compactValue() != null ? cfDefinition.compactValue().toString() : null;
                 if ("value".equals(value))
                 {
                     ColumnDef cDef = new ColumnDef();
                     cDef.name = ByteBufferUtil.bytes(value);
-                    cDef.validation_class = cfDefinition.value.type.toString();
+                    cDef.validation_class = cfDefinition.compactValue().type.toString();
                     columnDefs.add(cDef);
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index c6db82f..a5156b9 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -524,17 +524,17 @@ public class CqlStorage extends AbstractCassandraStorage
             if (keys.size() == 0)
             {
                 CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client);
-                for (ColumnIdentifier column : cfDefinition.keys.keySet())
+                for (CFDefinition.Name column : cfDefinition.partitionKeys())
                 {
-                    String key = column.toString();
+                    String key = column.name.toString();
                     logger.debug("name: {} ", key);
                     ColumnDef cDef = new ColumnDef();
                     cDef.name = ByteBufferUtil.bytes(key);
                     keys.add(cDef);
                 }
-                for (ColumnIdentifier column : cfDefinition.columns.keySet())
+                for (CFDefinition.Name column : cfDefinition.clusteringColumns())
                 {
-                    String key = column.toString();
+                    String key = column.name.toString();
                     logger.debug("name: {} ", key);
                     ColumnDef cDef = new ColumnDef();
                     cDef.name = ByteBufferUtil.bytes(key);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/service/CASConditions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CASConditions.java b/src/java/org/apache/cassandra/service/CASConditions.java
index d4b3e19..c0a2111 100644
--- a/src/java/org/apache/cassandra/service/CASConditions.java
+++ b/src/java/org/apache/cassandra/service/CASConditions.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.service;
 
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.exceptions.InvalidRequestException;
 
 /**
  * Abstract the conditions to be fulfilled by a CAS operation.
@@ -34,5 +35,5 @@ public interface CASConditions
      * Returns whether the provided CF, that represents the values fetched using the
      * readFilter(), match the CAS conditions this object stands for.
      */
-    public boolean appliesTo(ColumnFamily current);
+    public boolean appliesTo(ColumnFamily current) throws InvalidRequestException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/thrift/ThriftValidation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftValidation.java b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
index f2efc03..2b435ff 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftValidation.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
@@ -245,7 +245,8 @@ public class ThriftValidation
                     continue; // Row marker, ok
 
                 ColumnIdentifier columnId = new ColumnIdentifier(CQL3ColumnName, composite.types.get(columnIndex));
-                if (cfDef.metadata.get(columnId) == null)
+                CFDefinition.Name columnName = cfDef.get(columnId);
+                if (columnName == null || columnName.isPrimaryKeyColumn())
                     throw new org.apache.cassandra.exceptions.InvalidRequestException(String.format("Invalid cell for CQL3 table %s. The CQL3 column component (%s) does not correspond to a defined CQL3 column",
                                                                                                     metadata.cfName, columnId));
 


[2/2] git commit: Add static columns in CQL3

Posted by sl...@apache.org.
Add static columns in CQL3

patch by slebresne; reviewed by iamaleksey for CASSANDRA-6561


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b09d8769
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b09d8769
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b09d8769

Branch: refs/heads/cassandra-2.0
Commit: b09d876914ad9c9fdf1af35cf48cdb98c27bbf32
Parents: b2b3055
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jan 9 18:44:21 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Feb 20 10:15:02 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 doc/cql3/CQL.textile                            |  29 +-
 .../org/apache/cassandra/config/CFMetaData.java |  35 +-
 .../cassandra/config/ColumnDefinition.java      |   9 +-
 .../org/apache/cassandra/cql3/CFDefinition.java | 123 +++++--
 .../apache/cassandra/cql3/ColumnCondition.java  | 191 +++++++++++
 .../org/apache/cassandra/cql3/Constants.java    |   5 +-
 src/java/org/apache/cassandra/cql3/Cql.g        |  29 +-
 src/java/org/apache/cassandra/cql3/Lists.java   |   7 +-
 src/java/org/apache/cassandra/cql3/Maps.java    |   8 +-
 .../org/apache/cassandra/cql3/Operation.java    |  19 ++
 src/java/org/apache/cassandra/cql3/Sets.java    |   6 +-
 .../cassandra/cql3/functions/TokenFct.java      |   4 +-
 .../cql3/statements/AlterTableStatement.java    |  32 +-
 .../cql3/statements/BatchStatement.java         | 158 +++++++--
 .../cql3/statements/CQL3CasConditions.java      | 164 +++++++++
 .../cql3/statements/ColumnGroupMap.java         |  29 +-
 .../cql3/statements/CreateIndexStatement.java   |   9 +
 .../cql3/statements/CreateTableStatement.java   |  39 ++-
 .../cql3/statements/DeleteStatement.java        |  33 +-
 .../cql3/statements/ModificationStatement.java  | 331 +++++++++++--------
 .../cql3/statements/SelectStatement.java        | 258 ++++++++++++---
 .../cassandra/cql3/statements/Selection.java    |  33 +-
 .../cql3/statements/UpdateStatement.java        |  20 +-
 .../apache/cassandra/db/filter/ColumnSlice.java |   1 -
 .../db/index/composites/CompositesSearcher.java |  32 +-
 .../db/marshal/AbstractCompositeType.java       |  47 +--
 .../cassandra/db/marshal/CompositeType.java     |  72 +++-
 .../db/marshal/DynamicCompositeType.java        |   6 +
 .../hadoop/pig/AbstractCassandraStorage.java    |  11 +-
 .../apache/cassandra/hadoop/pig/CqlStorage.java |   8 +-
 .../apache/cassandra/service/CASConditions.java |   3 +-
 .../cassandra/thrift/ThriftValidation.java      |   3 +-
 33 files changed, 1384 insertions(+), 371 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4fffb9a..bbacc4d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -18,6 +18,7 @@
  * Fix count(*) queries in a mixed cluster (CASSANDRA-6707)
  * Improve repair tasks(snapshot, differencing) concurrency (CASSANDRA-6566)
  * Fix replaying pre-2.0 commit logs (CASSANDRA-6714)
+ * Add static columns to CQL3 (CASSANDRA-6561)
 Merged from 1.2:
  * Catch memtable flush exceptions during shutdown (CASSANDRA-6735)
  * Fix broken streams when replacing with same IP (CASSANDRA-6622)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/doc/cql3/CQL.textile
----------------------------------------------------------------------
diff --git a/doc/cql3/CQL.textile b/doc/cql3/CQL.textile
index 03b95e0..d872bde 100644
--- a/doc/cql3/CQL.textile
+++ b/doc/cql3/CQL.textile
@@ -219,7 +219,7 @@ bc(syntax)..
                           '(' <definition> ( ',' <definition> )* ')'
                           ( WITH <option> ( AND <option>)* )?
 
-<column-definition> ::= <identifier> <type> ( PRIMARY KEY )?
+<column-definition> ::= <identifier> <type> ( STATIC )? ( PRIMARY KEY )?
                       | PRIMARY KEY '(' <partition-key> ( ',' <identifier> )* ')'
 
 <partition-key> ::= <partition-key>
@@ -288,11 +288,35 @@ In CQL, the order in which columns are defined for the @PRIMARY KEY@ matters. Th
 
 The remaining columns of the @PRIMARY KEY@ definition, if any, are called __clustering columns. On a given physical node, rows for a given partition key are stored in the order induced by the clustering columns, making the retrieval of rows in that clustering order particularly efficient (see <a href="#selectStmt"><tt>SELECT</tt></a>).
 
+h4(#createTableStatic). @STATIC@ columns
+
+Some columns can be declared as @STATIC@ in a table definition. A column that is static will be "shared" by all the rows belonging to the same partition (having the same partition key). For instance, in:
+
+bc(sample). 
+CREATE TABLE test (
+    pk int,
+    t int,
+    v text,
+    s text static,
+    PRIMARY KEY (pk, t)
+);
+INSERT INTO test(pk, t, v, s) VALUES (0, 0, 'val0', 'static0');
+INSERT INTO test(pk, t, v, s) VALUES (0, 1, 'val1', 'static1');
+SELECT * FROM test WHERE pk=0 AND t=0;
+
+the last query will return @'static1'@ as value for @s@, since @s@ is static and thus the 2nd insertion modified this "shared" value. Note however that static columns are only static within a given partition, and if in the example above both rows where from different partitions (i.e. if they had different value for @pk@), then the 2nd insertion would not have modified the value of @s@ for the first row.
+
+A few restrictions applies to when static columns are allowed:
+* tables with the @COMPACT STORAGE@ option (see below) cannot have them
+* a table without clustering columns cannot have static columns (in a table without clustering columns, every partition has only one row, and so every column is inherently static).
+* only non @PRIMARY KEY@ columns can be static
+
+
 h4(#createTableOptions). @<option>@
 
 The @CREATE TABLE@ statement supports a number of options that controls the configuration of a new table. These options can be specified after the @WITH@ keyword.
 
-The first of these option is @COMPACT STORAGE@. This option is mainly targeted towards backward compatibility for definitions created before CQL3 (see "www.datastax.com/dev/blog/thrift-to-cql3":http://www.datastax.com/dev/blog/thrift-to-cql3 for more details).  The option also provides a slightly more compact layout of data on disk but at the price of diminished flexibility and extensibility for the table.  Most notably, @COMPACT STORAGE@ tables cannot have collections and a @COMPACT STORAGE@ table with at least one clustering column supports exactly one (as in not 0 nor more than 1) column not part of the @PRIMARY KEY@ definition (which imply in particular that you cannot add nor remove columns after creation). For those reasons, @COMPACT STORAGE@ is not recommended outside of the backward compatibility reason evoked above.
+The first of these option is @COMPACT STORAGE@. This option is mainly targeted towards backward compatibility for definitions created before CQL3 (see "www.datastax.com/dev/blog/thrift-to-cql3":http://www.datastax.com/dev/blog/thrift-to-cql3 for more details).  The option also provides a slightly more compact layout of data on disk but at the price of diminished flexibility and extensibility for the table.  Most notably, @COMPACT STORAGE@ tables cannot have collections nor static columns and a @COMPACT STORAGE@ table with at least one clustering column supports exactly one (as in not 0 nor more than 1) column not part of the @PRIMARY KEY@ definition (which imply in particular that you cannot add nor remove columns after creation). For those reasons, @COMPACT STORAGE@ is not recommended outside of the backward compatibility reason evoked above.
 
 Another option is @CLUSTERING ORDER@. It allows to define the ordering of rows on disk. It takes the list of the clustering column names with, for each of them, the on-disk order (Ascending or descending). Note that this option affects "what @ORDER BY@ are allowed during @SELECT@":#selectOrderBy.
 
@@ -1116,6 +1140,7 @@ The following describes the addition/changes brought for each version of CQL.
 h3. 3.1.5
 
 * It is now possible to group clustering columns in a relatiion, see "SELECT Where clauses":#selectWhere.
+* Added support for @STATIC@ columns, see "static in CREATE TABLE":#createTableStatic.
 
 h3. 3.1.4
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 714a8bc..a319930 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -27,6 +27,7 @@ import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.MapDifference;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.ArrayUtils;
@@ -408,6 +409,7 @@ public final class CFMetaData
     private volatile List<ColumnDefinition> partitionKeyColumns;  // Always of size keyValidator.componentsCount, null padded if necessary
     private volatile List<ColumnDefinition> clusteringKeyColumns; // Of size comparator.componentsCount or comparator.componentsCount -1, null padded if necessary
     private volatile Set<ColumnDefinition> regularColumns;
+    private volatile Set<ColumnDefinition> staticColumns;
     private volatile ColumnDefinition compactValueColumn;
 
     public volatile Class<? extends AbstractCompactionStrategy> compactionStrategyClass = DEFAULT_COMPACTION_STRATEGY_CLASS;
@@ -721,6 +723,16 @@ public final class CFMetaData
         return regularColumns;
     }
 
+    public Set<ColumnDefinition> staticColumns()
+    {
+        return staticColumns;
+    }
+
+    public Iterable<ColumnDefinition> regularAndStaticColumns()
+    {
+        return Iterables.concat(staticColumns, regularColumns);
+    }
+
     public ColumnDefinition compactValueColumn()
     {
         return compactValueColumn;
@@ -1328,7 +1340,7 @@ public final class CFMetaData
         // Mixing counter with non counter columns is not supported (#2614)
         if (defaultValidator instanceof CounterColumnType)
         {
-            for (ColumnDefinition def : regularColumns)
+            for (ColumnDefinition def : regularAndStaticColumns())
                 if (!(def.getValidator() instanceof CounterColumnType))
                     throw new ConfigurationException("Cannot add a non counter column (" + getColumnDefinitionComparator(def).getString(def.name) + ") in a counter column family");
         }
@@ -1839,7 +1851,7 @@ public final class CFMetaData
         if (column_metadata.get(to) != null)
             throw new InvalidRequestException(String.format("Cannot rename column %s to %s in keyspace %s; another column of that name already exist", strFrom, strTo, cfName));
 
-        if (def.type == ColumnDefinition.Type.REGULAR)
+        if (def.type == ColumnDefinition.Type.REGULAR || def.type == ColumnDefinition.Type.STATIC)
         {
             throw new InvalidRequestException(String.format("Cannot rename non PRIMARY KEY part %s", strFrom));
         }
@@ -1883,6 +1895,7 @@ public final class CFMetaData
                      : comparator.componentsCount() - (hasCollection() ? 2 : 1);
         List<ColumnDefinition> ckCols = nullInitializedList(nbCkCols);
         Set<ColumnDefinition> regCols = new HashSet<ColumnDefinition>();
+        Set<ColumnDefinition> statCols = new HashSet<ColumnDefinition>();
         ColumnDefinition compactCol = null;
 
         for (ColumnDefinition def : column_metadata.values())
@@ -1900,6 +1913,9 @@ public final class CFMetaData
                 case REGULAR:
                     regCols.add(def);
                     break;
+                case STATIC:
+                    statCols.add(def);
+                    break;
                 case COMPACT_VALUE:
                     assert compactCol == null : "There shouldn't be more than one compact value defined: got " + compactCol + " and " + def;
                     compactCol = def;
@@ -1911,6 +1927,7 @@ public final class CFMetaData
         partitionKeyColumns = addDefaultKeyAliases(pkCols);
         clusteringKeyColumns = addDefaultColumnAliases(ckCols);
         regularColumns = regCols;
+        staticColumns = statCols;
         compactValueColumn = addDefaultValueAlias(compactCol, isDense);
     }
 
@@ -2074,6 +2091,20 @@ public final class CFMetaData
         return true;
     }
 
+    public boolean hasStaticColumns()
+    {
+        return !staticColumns.isEmpty();
+    }
+
+    public ColumnNameBuilder getStaticColumnNameBuilder()
+    {
+        assert comparator instanceof CompositeType && clusteringKeyColumns().size() > 0;
+        CompositeType.Builder builder = CompositeType.Builder.staticBuilder((CompositeType)comparator);
+        for (int i = 0; i < clusteringKeyColumns().size(); i++)
+            builder.add(ByteBufferUtil.EMPTY_BYTE_BUFFER);
+        return builder;
+    }
+
     public void validateColumns(Iterable<Column> columns)
     {
         for (Column column : columns)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index 7ca4d45..11340e7 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -64,7 +64,8 @@ public class ColumnDefinition
         PARTITION_KEY,
         CLUSTERING_KEY,
         REGULAR,
-        COMPACT_VALUE
+        COMPACT_VALUE,
+        STATIC
     }
 
     public final ByteBuffer name;
@@ -96,6 +97,11 @@ public class ColumnDefinition
         return new ColumnDefinition(name, validator, componentIndex, Type.REGULAR);
     }
 
+    public static ColumnDefinition staticDef(ByteBuffer name, AbstractType<?> validator, Integer componentIndex)
+    {
+        return new ColumnDefinition(name, validator, componentIndex, Type.STATIC);
+    }
+
     public static ColumnDefinition compactValueDef(ByteBuffer name, AbstractType<?> validator)
     {
         return new ColumnDefinition(name, validator, null, Type.COMPACT_VALUE);
@@ -174,6 +180,7 @@ public class ColumnDefinition
 
     public boolean isThriftCompatible()
     {
+        // componentIndex == null should always imply isStatic in practice, but there is no harm in being too careful here.
         return type == ColumnDefinition.Type.REGULAR && componentIndex == null;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/CFDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CFDefinition.java b/src/java/org/apache/cassandra/cql3/CFDefinition.java
index 638770d..b589a95 100644
--- a/src/java/org/apache/cassandra/cql3/CFDefinition.java
+++ b/src/java/org/apache/cassandra/cql3/CFDefinition.java
@@ -42,11 +42,12 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
 
     public final CFMetaData cfm;
     // LinkedHashMap because the order does matter (it is the order in the composite type)
-    public final LinkedHashMap<ColumnIdentifier, Name> keys = new LinkedHashMap<ColumnIdentifier, Name>();
-    public final LinkedHashMap<ColumnIdentifier, Name> columns = new LinkedHashMap<ColumnIdentifier, Name>();
-    public final Name value;
+    private final LinkedHashMap<ColumnIdentifier, Name> partitionKeys = new LinkedHashMap<ColumnIdentifier, Name>();
+    private final LinkedHashMap<ColumnIdentifier, Name> clusteringColumns = new LinkedHashMap<ColumnIdentifier, Name>();
+    private final Name compactValue;
     // Keep metadata lexicographically ordered so that wildcard expansion have a deterministic order
-    public final Map<ColumnIdentifier, Name> metadata = new TreeMap<ColumnIdentifier, Name>();
+    private final Map<ColumnIdentifier, Name> staticColumns = new TreeMap<ColumnIdentifier, Name>();
+    private final Map<ColumnIdentifier, Name> regularColumns = new TreeMap<ColumnIdentifier, Name>();
 
     public final boolean isComposite;
     public final boolean hasCompositeKey;
@@ -65,7 +66,7 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
         for (int i = 0; i < cfm.partitionKeyColumns().size(); ++i)
         {
             ColumnIdentifier id = new ColumnIdentifier(cfm.partitionKeyColumns().get(i).name, definitionType);
-            this.keys.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.KEY_ALIAS, i, cfm.getKeyValidator().getComponents().get(i)));
+            this.partitionKeys.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.KEY_ALIAS, i, cfm.getKeyValidator().getComponents().get(i)));
         }
 
         this.isComposite = cfm.comparator instanceof CompositeType;
@@ -74,20 +75,25 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
         for (int i = 0; i < cfm.clusteringKeyColumns().size(); ++i)
         {
             ColumnIdentifier id = new ColumnIdentifier(cfm.clusteringKeyColumns().get(i).name, definitionType);
-            this.columns.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.COLUMN_ALIAS, i, cfm.comparator.getComponents().get(i)));
+            this.clusteringColumns.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.COLUMN_ALIAS, i, cfm.comparator.getComponents().get(i)));
         }
 
         if (isCompact)
         {
-            this.value = createValue(cfm);
+            this.compactValue = createValue(cfm);
         }
         else
         {
-            this.value = null;
+            this.compactValue = null;
             for (ColumnDefinition def : cfm.regularColumns())
             {
                 ColumnIdentifier id = new ColumnIdentifier(def.name, cfm.getColumnDefinitionComparator(def));
-                this.metadata.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.COLUMN_METADATA, def.getValidator()));
+                this.regularColumns.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.COLUMN_METADATA, def.getValidator()));
+            }
+            for (ColumnDefinition def : cfm.staticColumns())
+            {
+                ColumnIdentifier id = new ColumnIdentifier(def.name, cfm.getColumnDefinitionComparator(def));
+                this.staticColumns.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.STATIC, def.getValidator()));
             }
         }
     }
@@ -111,44 +117,86 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
                : new Name(cfm.ksName, cfm.cfName, alias, Name.Kind.VALUE_ALIAS, cfm.getDefaultValidator());
     }
 
+    public int partitionKeyCount()
+    {
+        return partitionKeys.size();
+    }
+
+    public Collection<Name> partitionKeys()
+    {
+        return partitionKeys.values();
+    }
+
+    public int clusteringColumnsCount()
+    {
+        return clusteringColumns.size();
+    }
+
+    public Collection<Name> clusteringColumns()
+    {
+        return clusteringColumns.values();
+    }
+
+    public Collection<Name> regularColumns()
+    {
+        return regularColumns.values();
+    }
+
+    public Collection<Name> staticColumns()
+    {
+        return regularColumns.values();
+    }
+
+    public Name compactValue()
+    {
+        return compactValue;
+    }
+
     public Name get(ColumnIdentifier name)
     {
-        CFDefinition.Name kdef = keys.get(name);
-        if (kdef != null)
-            return kdef;
-        if (value != null && name.equals(value.name))
-            return value;
-        CFDefinition.Name def = columns.get(name);
+        CFDefinition.Name def = partitionKeys.get(name);
         if (def != null)
             return def;
-        return metadata.get(name);
+        if (compactValue != null && name.equals(compactValue.name))
+            return compactValue;
+        def = clusteringColumns.get(name);
+        if (def != null)
+            return def;
+        def = regularColumns.get(name);
+        if (def != null)
+            return def;
+        return staticColumns.get(name);
     }
 
     public Iterator<Name> iterator()
     {
         return new AbstractIterator<Name>()
         {
-            private final Iterator<Name> keyIter = keys.values().iterator();
-            private final Iterator<Name> columnIter = columns.values().iterator();
+            private final Iterator<Name> keyIter = partitionKeys.values().iterator();
+            private final Iterator<Name> clusteringIter = clusteringColumns.values().iterator();
             private boolean valueDone;
-            private final Iterator<Name> metadataIter = metadata.values().iterator();
+            private final Iterator<Name> staticIter = staticColumns.values().iterator();
+            private final Iterator<Name> regularIter = regularColumns.values().iterator();
 
             protected Name computeNext()
             {
                 if (keyIter.hasNext())
                     return keyIter.next();
 
-                if (columnIter.hasNext())
-                    return columnIter.next();
+                if (clusteringIter.hasNext())
+                    return clusteringIter.next();
 
-                if (value != null && !valueDone)
+                if (compactValue != null && !valueDone)
                 {
                     valueDone = true;
-                    return value;
+                    return compactValue;
                 }
 
-                if (metadataIter.hasNext())
-                    return metadataIter.next();
+                if (staticIter.hasNext())
+                    return staticIter.next();
+
+                if (regularIter.hasNext())
+                    return regularIter.next();
 
                 return endOfData();
             }
@@ -173,7 +221,7 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
     {
         public static enum Kind
         {
-            KEY_ALIAS, COLUMN_ALIAS, VALUE_ALIAS, COLUMN_METADATA
+            KEY_ALIAS, COLUMN_ALIAS, VALUE_ALIAS, COLUMN_METADATA, STATIC
         }
 
         private Name(String ksName, String cfName, ColumnIdentifier name, Kind kind, AbstractType<?> type)
@@ -210,20 +258,29 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
         {
             return Objects.hashCode(ksName, cfName, name, type, kind, position);
         }
+
+        public boolean isPrimaryKeyColumn()
+        {
+            return kind == Kind.KEY_ALIAS || kind == Kind.COLUMN_ALIAS;
+        }
     }
 
     @Override
     public String toString()
     {
         StringBuilder sb = new StringBuilder();
-        sb.append(Joiner.on(", ").join(keys.values()));
-        if (!columns.isEmpty())
-            sb.append(", ").append(Joiner.on(", ").join(columns.values()));
+        sb.append(Joiner.on(", ").join(partitionKeys.values()));
+        if (!clusteringColumns.isEmpty())
+            sb.append(", ").append(Joiner.on(", ").join(clusteringColumns.values()));
         sb.append(" => ");
-        if (value != null)
-            sb.append(value.name);
-        if (!metadata.isEmpty())
-            sb.append("{").append(Joiner.on(", ").join(metadata.values())).append(" }");
+        if (compactValue != null)
+            sb.append(compactValue.name);
+        sb.append("{");
+        sb.append(Joiner.on(", ").join(staticColumns.values()));
+        if (!staticColumns.isEmpty())
+            sb.append(", ");
+        sb.append(Joiner.on(", ").join(regularColumns.values()));
+        sb.append("}");
         return sb.toString();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/ColumnCondition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ColumnCondition.java b/src/java/org/apache/cassandra/cql3/ColumnCondition.java
new file mode 100644
index 0000000..797dba6
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/ColumnCondition.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+/**
+ * A CQL3 condition.
+ */
+public class ColumnCondition
+{
+    public final CFDefinition.Name column;
+    private final Term value;
+
+    private List<ByteBuffer> variables;
+
+    private ColumnCondition(CFDefinition.Name column, Term value)
+    {
+        this.column = column;
+        this.value = value;
+    }
+
+    // The only ones we support so far
+    public static ColumnCondition equal(CFDefinition.Name column, Term value)
+    {
+        return new ColumnCondition(column, value);
+    }
+
+    // See CQL3CasConditions for why it's convenient to have this
+    public ColumnCondition attach(List<ByteBuffer> variables)
+    {
+        this.variables = variables;
+        return this;
+    }
+
+    /**
+     * Collects the column specification for the bind variables of this operation.
+     *
+     * @param boundNames the list of column specification where to collect the
+     * bind variables of this term in.
+     */
+    public void collectMarkerSpecification(VariableSpecifications boundNames)
+    {
+        value.collectMarkerSpecification(boundNames);
+    }
+
+    // Not overriding equals() because we need the variables to have been attached when this is
+    // called and so having a non standard method name might help avoid mistakes
+    public boolean equalsTo(ColumnCondition other) throws InvalidRequestException
+    {
+        return column.equals(other.column)
+            && value.bindAndGet(variables).equals(other.value.bindAndGet(other.variables));
+    }
+
+    private ColumnNameBuilder copyOrUpdatePrefix(CFMetaData cfm, ColumnNameBuilder rowPrefix)
+    {
+        return column.kind == CFDefinition.Name.Kind.STATIC ? cfm.getStaticColumnNameBuilder() : rowPrefix.copy();
+    }
+
+    /**
+     * Validates whether this condition applies to {@code current}.
+     */
+    public boolean appliesTo(ColumnNameBuilder rowPrefix, ColumnFamily current, long now) throws InvalidRequestException
+    {
+        if (column.type instanceof CollectionType)
+            return collectionAppliesTo((CollectionType)column.type, rowPrefix, current, now);
+
+        Column c = current.getColumn(copyOrUpdatePrefix(current.metadata(), rowPrefix).add(column.name.key).build());
+        ByteBuffer v = value.bindAndGet(variables);
+        return v == null
+             ? c == null || !c.isLive(now)
+             : c != null && c.isLive(now) && c.value().equals(v);
+    }
+
+    private boolean collectionAppliesTo(CollectionType type, ColumnNameBuilder rowPrefix, ColumnFamily current, final long now) throws InvalidRequestException
+    {
+        ColumnNameBuilder collectionPrefix = copyOrUpdatePrefix(current.metadata(), rowPrefix).add(column.name.key);
+        // We are testing for collection equality, so we need to have the expected values *and* only those.
+        ColumnSlice[] collectionSlice = new ColumnSlice[]{ new ColumnSlice(collectionPrefix.build(), collectionPrefix.buildAsEndOfRange()) };
+        // Filter live columns, this makes things simpler afterwards
+        Iterator<Column> iter = Iterators.filter(current.iterator(collectionSlice), new Predicate<Column>()
+        {
+            public boolean apply(Column c)
+            {
+                // we only care about live columns
+                return c.isLive(now);
+            }
+        });
+
+        Term.Terminal v = value.bind(variables);
+        if (v == null)
+            return !iter.hasNext();
+
+        switch (type.kind)
+        {
+            case LIST: return listAppliesTo(current.metadata(), iter, ((Lists.Value)v).elements);
+            case SET: return setAppliesTo(current.metadata(), iter, ((Sets.Value)v).elements);
+            case MAP: return mapAppliesTo(current.metadata(), iter, ((Maps.Value)v).map);
+        }
+        throw new AssertionError();
+    }
+
+    private static ByteBuffer collectionKey(CFMetaData cfm, Column c)
+    {
+        ByteBuffer[] bbs = ((CompositeType)cfm.comparator).split(c.name());
+        return bbs[bbs.length - 1];
+    }
+
+    private boolean listAppliesTo(CFMetaData cfm, Iterator<Column> iter, List<ByteBuffer> elements)
+    {
+        for (ByteBuffer e : elements)
+            if (!iter.hasNext() || iter.next().value().equals(e))
+                return false;
+        // We must not have more elements than expected
+        return !iter.hasNext();
+    }
+
+    private boolean setAppliesTo(CFMetaData cfm, Iterator<Column> iter, Set<ByteBuffer> elements)
+    {
+        Set<ByteBuffer> remaining = new HashSet<>(elements);
+        while (iter.hasNext())
+        {
+            if (remaining.isEmpty())
+                return false;
+
+            if (!remaining.remove(collectionKey(cfm, iter.next())))
+                return false;
+        }
+        return remaining.isEmpty();
+    }
+
+    private boolean mapAppliesTo(CFMetaData cfm, Iterator<Column> iter, Map<ByteBuffer, ByteBuffer> elements)
+    {
+        Map<ByteBuffer, ByteBuffer> remaining = new HashMap<>(elements);
+        while (iter.hasNext())
+        {
+            if (remaining.isEmpty())
+                return false;
+
+            Column c = iter.next();
+            if (!remaining.remove(collectionKey(cfm, c)).equals(c.value()))
+                return false;
+        }
+        return remaining.isEmpty();
+    }
+
+    public static class Raw
+    {
+        private final Term.Raw value;
+
+        public Raw(Term.Raw value)
+        {
+            this.value = value;
+        }
+
+        public ColumnCondition prepare(CFDefinition.Name receiver) throws InvalidRequestException
+        {
+            if (receiver.type instanceof CounterColumnType)
+                throw new InvalidRequestException("Condtions on counters are not supported");
+
+            return ColumnCondition.equal(receiver, value.prepare(receiver));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/Constants.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Constants.java b/src/java/org/apache/cassandra/cql3/Constants.java
index bcfe00d..f99fd02 100644
--- a/src/java/org/apache/cassandra/cql3/Constants.java
+++ b/src/java/org/apache/cassandra/cql3/Constants.java
@@ -296,6 +296,7 @@ public abstract class Constants
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
+            prefix = maybeUpdatePrefix(cf.metadata(), prefix);
             ByteBuffer cname = columnName == null ? prefix.build() : prefix.add(columnName.key).build();
             ByteBuffer value = t.bindAndGet(params.variables);
             cf.addColumn(value == null ? params.makeTombstone(cname) : params.makeColumn(cname, value));
@@ -315,6 +316,7 @@ public abstract class Constants
             if (bytes == null)
                 throw new InvalidRequestException("Invalid null value for counter increment");
             long increment = ByteBufferUtil.toLong(bytes);
+            prefix = maybeUpdatePrefix(cf.metadata(), prefix);
             ByteBuffer cname = columnName == null ? prefix.build() : prefix.add(columnName.key).build();
             cf.addCounter(cname, increment);
         }
@@ -337,6 +339,7 @@ public abstract class Constants
             if (increment == Long.MIN_VALUE)
                 throw new InvalidRequestException("The negation of " + increment + " overflows supported counter precision (signed 8 bytes integer)");
 
+            prefix = maybeUpdatePrefix(cf.metadata(), prefix);
             ByteBuffer cname = columnName == null ? prefix.build() : prefix.add(columnName.key).build();
             cf.addCounter(cname, -increment);
         }
@@ -356,7 +359,7 @@ public abstract class Constants
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
-            ColumnNameBuilder column = prefix.add(columnName.key);
+            ColumnNameBuilder column = maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key);
 
             if (isCollection)
                 cf.addAtom(params.makeRangeTombstone(column.build(), column.buildAsEndOfRange()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index 6e7cf1c..a11a818 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -351,21 +351,22 @@ updateStatement returns [UpdateStatement.ParsedUpdate expr]
       ( usingClause[attrs] )?
       K_SET columnOperation[operations] (',' columnOperation[operations])*
       K_WHERE wclause=whereClause
-      ( K_IF conditions=updateCondition )?
+      ( K_IF conditions=updateConditions )?
       {
           return new UpdateStatement.ParsedUpdate(cf,
                                                   attrs,
                                                   operations,
                                                   wclause,
-                                                  conditions == null ? Collections.<Pair<ColumnIdentifier, Operation.RawUpdate>>emptyList() : conditions);
+                                                  conditions == null ? Collections.<Pair<ColumnIdentifier, ColumnCondition.Raw>>emptyList() : conditions);
      }
     ;
 
-updateCondition returns [List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions]
-    @init { conditions = new ArrayList<Pair<ColumnIdentifier, Operation.RawUpdate>>(); }
-    : columnOperation[conditions] ( K_AND columnOperation[conditions] )*
+updateConditions returns [List<Pair<ColumnIdentifier, ColumnCondition.Raw>> conditions]
+    @init { conditions = new ArrayList<Pair<ColumnIdentifier, ColumnCondition.Raw>>(); }
+    : columnCondition[conditions] ( K_AND columnCondition[conditions] )*
     ;
 
+
 /**
  * DELETE name1, name2
  * FROM <CF>
@@ -381,13 +382,13 @@ deleteStatement returns [DeleteStatement.Parsed expr]
       K_FROM cf=columnFamilyName
       ( usingClauseDelete[attrs] )?
       K_WHERE wclause=whereClause
-      ( K_IF conditions=updateCondition )?
+      ( K_IF conditions=updateConditions )?
       {
           return new DeleteStatement.Parsed(cf,
                                             attrs,
                                             columnDeletions,
                                             wclause,
-                                            conditions == null ? Collections.<Pair<ColumnIdentifier, Operation.RawUpdate>>emptyList() : conditions);
+                                            conditions == null ? Collections.<Pair<ColumnIdentifier, ColumnCondition.Raw>>emptyList() : conditions);
       }
     ;
 
@@ -484,7 +485,8 @@ cfamDefinition[CreateTableStatement.RawStatement expr]
     ;
 
 cfamColumns[CreateTableStatement.RawStatement expr]
-    : k=cident v=comparatorType { $expr.addDefinition(k, v); } (K_PRIMARY K_KEY { $expr.addKeyAliases(Collections.singletonList(k)); })?
+    : k=cident v=comparatorType { boolean isStatic=false; } (K_STATIC {isStatic = true;})? { $expr.addDefinition(k, v, isStatic); }
+        (K_PRIMARY K_KEY { $expr.addKeyAliases(Collections.singletonList(k)); })?
     | K_PRIMARY K_KEY '(' pkDef[expr] (',' c=cident { $expr.addColumnAlias(c); } )* ')'
     ;
 
@@ -558,10 +560,11 @@ alterTableStatement returns [AlterTableStatement expr]
         AlterTableStatement.Type type = null;
         CFPropDefs props = new CFPropDefs();
         Map<ColumnIdentifier, ColumnIdentifier> renames = new HashMap<ColumnIdentifier, ColumnIdentifier>();
+        boolean isStatic = false;
     }
     : K_ALTER K_COLUMNFAMILY cf=columnFamilyName
           ( K_ALTER id=cident K_TYPE v=comparatorType { type = AlterTableStatement.Type.ALTER; }
-          | K_ADD   id=cident v=comparatorType        { type = AlterTableStatement.Type.ADD; }
+          | K_ADD   id=cident v=comparatorType ({ isStatic=true; } K_STATIC)? { type = AlterTableStatement.Type.ADD; }
           | K_DROP  id=cident                         { type = AlterTableStatement.Type.DROP; }
           | K_WITH  properties[props]                 { type = AlterTableStatement.Type.OPTS; }
           | K_RENAME                                  { type = AlterTableStatement.Type.RENAME; }
@@ -569,7 +572,7 @@ alterTableStatement returns [AlterTableStatement expr]
                ( K_AND idn=cident K_TO toIdn=cident { renames.put(idn, toIdn); } )*
           )
     {
-        $expr = new AlterTableStatement(cf, type, id, v, props, renames);
+        $expr = new AlterTableStatement(cf, type, id, v, props, renames, isStatic);
     }
     ;
 
@@ -845,6 +848,10 @@ columnOperation[List<Pair<ColumnIdentifier, Operation.RawUpdate>> operations]
       }
     ;
 
+columnCondition[List<Pair<ColumnIdentifier, ColumnCondition.Raw>> conditions]
+    : key=cident '=' t=term { conditions.add(Pair.create(key, new ColumnCondition.Raw(t))); } // Note: we'll reject duplicates later
+    ;
+
 properties[PropertyDefinitions props]
     : property[props] (K_AND property[props])*
     ;
@@ -981,6 +988,7 @@ unreserved_function_keyword returns [String str]
         | K_CUSTOM
         | K_TRIGGER
         | K_DISTINCT
+        | K_STATIC
         ) { $str = $k.text; }
     | t=native_type { $str = t.toString(); }
     ;
@@ -1084,6 +1092,7 @@ K_NAN:         N A N;
 K_INFINITY:    I N F I N I T Y;
 
 K_TRIGGER:     T R I G G E R;
+K_STATIC:      S T A T I C;
 
 // Case-insensitive alpha characters
 fragment A: ('a'|'A');

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/Lists.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Lists.java b/src/java/org/apache/cassandra/cql3/Lists.java
index 4ca5eb3..4ad39db 100644
--- a/src/java/org/apache/cassandra/cql3/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/Lists.java
@@ -266,7 +266,7 @@ public abstract class Lists
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
             // delete + append
-            ColumnNameBuilder column = prefix.add(columnName.key);
+            ColumnNameBuilder column = maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key);
             cf.addAtom(params.makeTombstoneForOverwrite(column.build(), column.buildAsEndOfRange()));
             Appender.doAppend(t, cf, column, params);
         }
@@ -309,6 +309,7 @@ public abstract class Lists
                 throw new InvalidRequestException(String.format("List index %d out of bound, list has size %d", idx, existingList.size()));
 
             ByteBuffer elementName = existingList.get(idx).right.name();
+            // Since we reuse the name we're read, if it's a static column, the static marker will already be set
 
             if (value == null)
             {
@@ -336,7 +337,7 @@ public abstract class Lists
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
-            doAppend(t, cf, prefix.add(columnName.key), params);
+            doAppend(t, cf, maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key), params);
         }
 
         static void doAppend(Term t, ColumnFamily cf, ColumnNameBuilder columnName, UpdateParameters params) throws InvalidRequestException
@@ -376,7 +377,7 @@ public abstract class Lists
             long time = PrecisionTime.REFERENCE_TIME - (System.currentTimeMillis() - PrecisionTime.REFERENCE_TIME);
 
             List<ByteBuffer> toAdd = ((Lists.Value)value).elements;
-            ColumnNameBuilder column = prefix.add(columnName.key);
+            ColumnNameBuilder column = maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key);
             for (int i = 0; i < toAdd.size(); i++)
             {
                 ColumnNameBuilder b = i == toAdd.size() - 1 ? column : column.copy();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/Maps.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Maps.java b/src/java/org/apache/cassandra/cql3/Maps.java
index 30d796c..c332999 100644
--- a/src/java/org/apache/cassandra/cql3/Maps.java
+++ b/src/java/org/apache/cassandra/cql3/Maps.java
@@ -244,7 +244,7 @@ public abstract class Maps
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
             // delete + put
-            ColumnNameBuilder column = prefix.add(columnName.key);
+            ColumnNameBuilder column = maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key);
             cf.addAtom(params.makeTombstoneForOverwrite(column.build(), column.buildAsEndOfRange()));
             Putter.doPut(t, cf, column, params);
         }
@@ -274,7 +274,7 @@ public abstract class Maps
             if (key == null)
                 throw new InvalidRequestException("Invalid null map key");
 
-            ByteBuffer cellName = prefix.add(columnName.key).add(key).build();
+            ByteBuffer cellName = maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key).add(key).build();
 
             if (value == null)
             {
@@ -302,7 +302,7 @@ public abstract class Maps
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
-            doPut(t, cf, prefix.add(columnName.key), params);
+            doPut(t, cf, maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key), params);
         }
 
         static void doPut(Term t, ColumnFamily cf, ColumnNameBuilder columnName, UpdateParameters params) throws InvalidRequestException
@@ -335,7 +335,7 @@ public abstract class Maps
                 throw new InvalidRequestException("Invalid null map key");
             assert key instanceof Constants.Value;
 
-            ByteBuffer cellName = prefix.add(columnName.key).add(((Constants.Value)key).bytes).build();
+            ByteBuffer cellName = maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key).add(((Constants.Value)key).bytes).build();
             cf.addColumn(params.makeTombstone(cellName));
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/Operation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Operation.java b/src/java/org/apache/cassandra/cql3/Operation.java
index 00dd046..6bf46b5 100644
--- a/src/java/org/apache/cassandra/cql3/Operation.java
+++ b/src/java/org/apache/cassandra/cql3/Operation.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.cql3;
 
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.CounterColumnType;
@@ -54,6 +56,23 @@ public abstract class Operation
         this.t = t;
     }
 
+    // Whether the colum operated on is a static column (on trunk, Operation stores the ColumnDefinition directly,
+    // not just the column name, so we'll be able to remove that lookup and check ColumnDefinition.isStatic field
+    // directly. But for 2.0, it's simpler that way).
+    public boolean isStatic(CFMetaData cfm)
+    {
+        if (columnName == null)
+            return false;
+
+        ColumnDefinition def = cfm.getColumnDefinition(columnName.key);
+        return def != null && def.type == ColumnDefinition.Type.STATIC;
+    }
+
+    protected ColumnNameBuilder maybeUpdatePrefix(CFMetaData cfm, ColumnNameBuilder prefix)
+    {
+        return isStatic(cfm) ? cfm.getStaticColumnNameBuilder() : prefix;
+    }
+
     /**
      * @return whether the operation requires a read of the previous value to be executed
      * (only lists setterByIdx, discard and discardByIdx requires that).

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/Sets.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Sets.java b/src/java/org/apache/cassandra/cql3/Sets.java
index 0fcb8bf..69bc3d3 100644
--- a/src/java/org/apache/cassandra/cql3/Sets.java
+++ b/src/java/org/apache/cassandra/cql3/Sets.java
@@ -230,7 +230,7 @@ public abstract class Sets
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
             // delete + add
-            ColumnNameBuilder column = prefix.add(columnName.key);
+            ColumnNameBuilder column = maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key);
             cf.addAtom(params.makeTombstoneForOverwrite(column.build(), column.buildAsEndOfRange()));
             Adder.doAdd(t, cf, column, params);
         }
@@ -245,7 +245,7 @@ public abstract class Sets
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
-            doAdd(t, cf, prefix.add(columnName.key), params);
+            doAdd(t, cf, maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key), params);
         }
 
         static void doAdd(Term t, ColumnFamily cf, ColumnNameBuilder columnName, UpdateParameters params) throws InvalidRequestException
@@ -283,7 +283,7 @@ public abstract class Sets
                                       ? Collections.singleton(((Constants.Value)value).bytes)
                                       : ((Sets.Value)value).elements;
 
-            ColumnNameBuilder column = prefix.add(columnName.key);
+            ColumnNameBuilder column = maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key);
             for (ByteBuffer bb : toDiscard)
             {
                 ByteBuffer cellName = column.copy().add(bb).build();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java b/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
index ac6b999..4f3ff4a 100644
--- a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
+++ b/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
@@ -52,9 +52,9 @@ public class TokenFct extends AbstractFunction
 
     private static AbstractType[] getKeyTypes(CFMetaData cfm)
     {
-        AbstractType[] types = new AbstractType[cfm.getCfDef().keys.size()];
+        AbstractType[] types = new AbstractType[cfm.getCfDef().partitionKeyCount()];
         int i = 0;
-        for (CFDefinition.Name name : cfm.getCfDef().keys.values())
+        for (CFDefinition.Name name : cfm.getCfDef().partitionKeys())
             types[i++] = name.type;
         return types;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index f740ea6..85b3547 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -47,8 +47,15 @@ public class AlterTableStatement extends SchemaAlteringStatement
     public final ColumnIdentifier columnName;
     private final CFPropDefs cfProps;
     private final Map<ColumnIdentifier, ColumnIdentifier> renames;
+    private final boolean isStatic; // Only for ALTER ADD
 
-    public AlterTableStatement(CFName name, Type type, ColumnIdentifier columnName, CQL3Type validator, CFPropDefs cfProps, Map<ColumnIdentifier, ColumnIdentifier> renames)
+    public AlterTableStatement(CFName name,
+                               Type type,
+                               ColumnIdentifier columnName,
+                               CQL3Type validator,
+                               CFPropDefs cfProps,
+                               Map<ColumnIdentifier, ColumnIdentifier> renames,
+                               boolean isStatic)
     {
         super(name);
         this.oType = type;
@@ -56,6 +63,7 @@ public class AlterTableStatement extends SchemaAlteringStatement
         this.validator = validator; // used only for ADD/ALTER commands
         this.cfProps = cfProps;
         this.renames = renames;
+        this.isStatic = isStatic;
     }
 
     public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
@@ -79,7 +87,11 @@ public class AlterTableStatement extends SchemaAlteringStatement
         {
             case ADD:
                 if (cfDef.isCompact)
-                    throw new InvalidRequestException("Cannot add new column to a compact CF");
+                    throw new InvalidRequestException("Cannot add new column to a COMPACT STORAGE table");
+
+                if (isStatic && !cfDef.isComposite)
+                    throw new InvalidRequestException("Static columns are not allowed in COMPACT STORAGE tables");
+
                 if (name != null)
                 {
                     switch (name.kind)
@@ -87,7 +99,7 @@ public class AlterTableStatement extends SchemaAlteringStatement
                         case KEY_ALIAS:
                         case COLUMN_ALIAS:
                             throw new InvalidRequestException(String.format("Invalid column name %s because it conflicts with a PRIMARY KEY part", columnName));
-                        case COLUMN_METADATA:
+                        default:
                             throw new InvalidRequestException(String.format("Invalid column name %s because it conflicts with an existing column", columnName));
                     }
                 }
@@ -117,7 +129,9 @@ public class AlterTableStatement extends SchemaAlteringStatement
                 Integer componentIndex = cfDef.isComposite
                                        ? ((CompositeType)meta.comparator).types.size() - (cfDef.hasCollections ? 2 : 1)
                                        : null;
-                cfm.addColumnDefinition(ColumnDefinition.regularDef(columnName.key, type, componentIndex));
+                cfm.addColumnDefinition(isStatic
+                                        ? ColumnDefinition.staticDef(columnName.key, type, componentIndex)
+                                        : ColumnDefinition.regularDef(columnName.key, type, componentIndex));
                 break;
 
             case ALTER:
@@ -178,6 +192,7 @@ public class AlterTableStatement extends SchemaAlteringStatement
                         cfm.defaultValidator(validator.getType());
                         break;
                     case COLUMN_METADATA:
+                    case STATIC:
                         ColumnDefinition column = cfm.getColumnDefinition(columnName.key);
                         // Thrift allows to change a column validator so CFMetaData.validateCompatibility will let it slide
                         // if we change to an incompatible type (contrarily to the comparator case). But we don't want to
@@ -196,10 +211,8 @@ public class AlterTableStatement extends SchemaAlteringStatement
                 break;
 
             case DROP:
-                if (cfDef.isCompact)
-                    throw new InvalidRequestException("Cannot drop columns from a compact CF");
-                if (!cfDef.isComposite)
-                    throw new InvalidRequestException("Cannot drop columns from a non-CQL3 CF");
+                if (cfDef.isCompact || !cfDef.isComposite)
+                    throw new InvalidRequestException("Cannot drop columns from a COMPACT STORAGE table");
                 if (name == null)
                     throw new InvalidRequestException(String.format("Column %s was not found in table %s", columnName, columnFamily()));
 
@@ -209,8 +222,9 @@ public class AlterTableStatement extends SchemaAlteringStatement
                     case COLUMN_ALIAS:
                         throw new InvalidRequestException(String.format("Cannot drop PRIMARY KEY part %s", columnName));
                     case COLUMN_METADATA:
+                    case STATIC:
                         ColumnDefinition toDelete = null;
-                        for (ColumnDefinition columnDef : cfm.regularColumns())
+                        for (ColumnDefinition columnDef : cfm.regularAndStaticColumns())
                         {
                             if (columnDef.name.equals(columnName.key))
                                 toDelete = columnDef;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 6151490..d4acbae 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -20,11 +20,11 @@ package org.apache.cassandra.cql3.statements;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import com.google.common.collect.Iterables;
 import org.github.jamm.MemoryMeter;
 
 import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
@@ -47,6 +47,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
     public final Type type;
     private final List<ModificationStatement> statements;
     private final Attributes attrs;
+    private final boolean hasConditions;
 
     /**
      * Creates a new BatchStatement from a list of statements and a
@@ -58,10 +59,16 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
      */
     public BatchStatement(int boundTerms, Type type, List<ModificationStatement> statements, Attributes attrs)
     {
+        this(boundTerms, type, statements, attrs, false);
+    }
+
+    public BatchStatement(int boundTerms, Type type, List<ModificationStatement> statements, Attributes attrs, boolean hasConditions)
+    {
         this.boundTerms = boundTerms;
         this.type = type;
         this.statements = statements;
         this.attrs = attrs;
+        this.hasConditions = hasConditions;
     }
 
     public long measureForPreparedCache(MemoryMeter meter)
@@ -103,25 +110,15 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         return statements;
     }
 
-    private Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
-    throws RequestExecutionException, RequestValidationException
-    {
-        Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String, ByteBuffer>, IMutation>();
-        for (ModificationStatement statement : statements)
-            addStatementMutations(statement, variables, local, cl, now, mutations);
-
-        return mutations.values();
-    }
-
-    private Collection<? extends IMutation> getMutations(List<List<ByteBuffer>> variables, ConsistencyLevel cl, long now)
+    private Collection<? extends IMutation> getMutations(BatchVariables variables, boolean local, ConsistencyLevel cl, long now)
     throws RequestExecutionException, RequestValidationException
     {
         Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String, ByteBuffer>, IMutation>();
         for (int i = 0; i < statements.size(); i++)
         {
             ModificationStatement statement = statements.get(i);
-            List<ByteBuffer> statementVariables = variables.get(i);
-            addStatementMutations(statement, statementVariables, false, cl, now, mutations);
+            List<ByteBuffer> statementVariables = variables.getVariablesForStatement(i);
+            addStatementMutations(statement, statementVariables, local, cl, now, mutations);
         }
         return mutations.values();
     }
@@ -156,31 +153,132 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         if (options.getConsistency() == null)
             throw new InvalidRequestException("Invalid empty consistency level");
 
-        execute(getMutations(options.getValues(), false, options.getConsistency(), queryState.getTimestamp()), options.getConsistency());
-        return null;
+        return execute(new PreparedBatchVariables(options.getValues()), false, options.getConsistency(), queryState.getTimestamp());
     }
 
-    public void executeWithPerStatementVariables(ConsistencyLevel cl, QueryState queryState, List<List<ByteBuffer>> variables) throws RequestExecutionException, RequestValidationException
+    public ResultMessage executeWithPerStatementVariables(ConsistencyLevel cl, QueryState queryState, List<List<ByteBuffer>> variables) throws RequestExecutionException, RequestValidationException
     {
         if (cl == null)
             throw new InvalidRequestException("Invalid empty consistency level");
 
-        execute(getMutations(variables, cl, queryState.getTimestamp()), cl);
+        return execute(new BatchOfPreparedVariables(variables), false, cl, queryState.getTimestamp());
+    }
+
+    public ResultMessage execute(BatchVariables variables, boolean local, ConsistencyLevel cl, long now)
+    throws RequestExecutionException, RequestValidationException
+    {
+        // TODO: we don't support a serial consistency for batches in the protocol so defaulting to SERIAL for now.
+        // We'll need to fix that.
+        if (hasConditions)
+            return executeWithConditions(variables, cl, ConsistencyLevel.SERIAL, now);
+
+        executeWithoutConditions(getMutations(variables, local, cl, now), cl);
+        return null;
     }
 
-    private void execute(Collection<? extends IMutation> mutations, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException
+    private void executeWithoutConditions(Collection<? extends IMutation> mutations, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException
     {
         boolean mutateAtomic = (type == Type.LOGGED && mutations.size() > 1);
         StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);
     }
 
+    private ResultMessage executeWithConditions(BatchVariables variables, ConsistencyLevel cl, ConsistencyLevel serialCf, long now)
+    throws RequestExecutionException, RequestValidationException
+    {
+        ByteBuffer key = null;
+        String ksName = null;
+        String cfName = null;
+        ColumnFamily updates = null;
+        CQL3CasConditions conditions = null;
+        Set<ColumnIdentifier> columnsWithConditions = new LinkedHashSet<ColumnIdentifier>();
+
+        for (int i = 0; i < statements.size(); i++)
+        {
+            ModificationStatement statement = statements.get(i);
+            List<ByteBuffer> statementVariables = variables.getVariablesForStatement(i);
+            long timestamp = attrs.getTimestamp(now, statementVariables);
+            List<ByteBuffer> pks = statement.buildPartitionKeyNames(statementVariables);
+            if (pks.size() > 1)
+                throw new IllegalArgumentException("Batch with conditions cannot span multiple partitions (you cannot use IN on the partition key)");
+            if (key == null)
+            {
+                key = pks.get(0);
+                ksName = statement.cfm.ksName;
+                cfName = statement.cfm.cfName;
+                conditions = new CQL3CasConditions(statement.cfm, now);
+                updates = UnsortedColumns.factory.create(statement.cfm);
+            }
+            else if (!key.equals(pks.get(0)))
+            {
+                throw new InvalidRequestException("Batch with conditions cannot span multiple partitions");
+            }
+
+            if (statement.hasConditions())
+            {
+                ColumnNameBuilder clusteringPrefix = statement.createClusteringPrefixBuilder(statementVariables);
+                statement.addUpdatesAndConditions(key, clusteringPrefix, updates, conditions, statementVariables, timestamp);
+                // As soon as we have a ifNotExists, we set columnsWithConditions to null so that everything is in the resultSet
+                if (statement.hasIfNotExistCondition())
+                    columnsWithConditions = null;
+                else if (columnsWithConditions != null)
+                    Iterables.addAll(columnsWithConditions, statement.getColumnsWithConditions());
+            }
+            else
+            {
+                // getPartitionKey will already have thrown if there is more than one key involved
+                IMutation mut = statement.getMutations(statementVariables, false, cl, timestamp, true).iterator().next();
+                updates.resolve(mut.getColumnFamilies().iterator().next());
+            }
+        }
+
+        ColumnFamily result = StorageProxy.cas(ksName, cfName, key, conditions, updates, serialCf, cl);
+        return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, key, cfName, result, columnsWithConditions, true));
+    }
+
     public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException
     {
-        for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp()))
+        assert !hasConditions;
+
+        for (IMutation mutation : getMutations(new PreparedBatchVariables(Collections.<ByteBuffer>emptyList()), true, null, queryState.getTimestamp()))
             mutation.apply();
         return null;
     }
 
+    public interface BatchVariables
+    {
+        public List<ByteBuffer> getVariablesForStatement(int statementInBatch);
+    }
+
+    public static class PreparedBatchVariables implements BatchVariables
+    {
+        private final List<ByteBuffer> variables;
+
+        public PreparedBatchVariables(List<ByteBuffer> variables)
+        {
+            this.variables = variables;
+        }
+
+        public List<ByteBuffer> getVariablesForStatement(int statementInBatch)
+        {
+            return variables;
+        }
+    }
+
+    public static class BatchOfPreparedVariables implements BatchVariables
+    {
+        private final List<List<ByteBuffer>> variables;
+
+        public BatchOfPreparedVariables(List<List<ByteBuffer>> variables)
+        {
+            this.variables = variables;
+        }
+
+        public List<ByteBuffer> getVariablesForStatement(int statementInBatch)
+        {
+            return variables.get(statementInBatch);
+        }
+    }
+
     public String toString()
     {
         return String.format("BatchStatement(type=%s, statements=%s)", type, statements);
@@ -212,11 +310,12 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
             VariableSpecifications boundNames = getBoundVariables();
 
             List<ModificationStatement> statements = new ArrayList<ModificationStatement>(parsedStatements.size());
+            boolean hasConditions = false;
             for (ModificationStatement.Parsed parsed : parsedStatements)
             {
                 ModificationStatement stmt = parsed.prepare(boundNames);
                 if (stmt.hasConditions())
-                    throw new InvalidRequestException("Conditional updates are not allowed in batches");
+                    hasConditions = true;
 
                 if (stmt.isCounter() && type != Type.COUNTER)
                     throw new InvalidRequestException("Counter mutations are only allowed in COUNTER batches");
@@ -227,10 +326,23 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
                 statements.add(stmt);
             }
 
+            if (hasConditions)
+            {
+                String ksName = null;
+                String cfName = null;
+                for (ModificationStatement stmt : statements)
+                {
+                    if (ksName != null && (!stmt.keyspace().equals(ksName) || !stmt.columnFamily().equals(cfName)))
+                        throw new InvalidRequestException("Batch with conditions cannot span multiple tables");
+                    ksName = stmt.keyspace();
+                    cfName = stmt.columnFamily();
+                }
+            }
+
             Attributes prepAttrs = attrs.prepare("[batch]", "[batch]");
             prepAttrs.collectMarkerSpecification(boundNames);
 
-            return new ParsedStatement.Prepared(new BatchStatement(boundNames.size(), type, statements, prepAttrs), boundNames);
+            return new ParsedStatement.Prepared(new BatchStatement(boundNames.size(), type, statements, prepAttrs, hasConditions), boundNames);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
new file mode 100644
index 0000000..194ff0c
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.service.CASConditions;
+
+/**
+ * Processed CAS conditions on potentially multiple rows of the same partition.
+ */
+public class CQL3CasConditions implements CASConditions
+{
+    private final CFMetaData cfm;
+    private final long now;
+
+    // We index RowCondition by the prefix of the row they applied to for 2 reasons:
+    //   1) this allows to keep things sorted to build the ColumnSlice array below
+    //   2) this allows to detect when contradictory conditions are set (not exists with some other conditions on the same row)
+    private final SortedMap<ByteBuffer, RowCondition> conditions;
+
+    public CQL3CasConditions(CFMetaData cfm, long now)
+    {
+        this.cfm = cfm;
+        this.now = now;
+        this.conditions = new TreeMap<>(cfm.comparator);
+    }
+
+    public void addNotExist(ColumnNameBuilder prefix) throws InvalidRequestException
+    {
+        RowCondition previous = conditions.put(prefix.build(), new NotExistCondition(prefix, now));
+        if (previous != null && !(previous instanceof NotExistCondition))
+            throw new InvalidRequestException("Cannot mix IF conditions and IF NOT EXISTS for the same row");
+    }
+
+    public void addConditions(ColumnNameBuilder prefix, Collection<ColumnCondition> conds, List<ByteBuffer> variables) throws InvalidRequestException
+    {
+        ByteBuffer b = prefix.build();
+        RowCondition condition = conditions.get(b);
+        if (condition == null)
+        {
+            condition = new ColumnsConditions(prefix, now);
+            conditions.put(b, condition);
+        }
+        else if (!(condition instanceof ColumnsConditions))
+        {
+            throw new InvalidRequestException("Cannot mix IF conditions and IF NOT EXISTS for the same row");
+        }
+        ((ColumnsConditions)condition).addConditions(conds, variables);
+    }
+
+    public IDiskAtomFilter readFilter()
+    {
+        assert !conditions.isEmpty();
+        ColumnSlice[] slices = new ColumnSlice[conditions.size()];
+        int i = 0;
+        // We always read CQL rows entirely as on CAS failure we want to be able to distinguish between "row exists
+        // but all values on why there were conditions are null" and "row doesn't exists", and we can't rely on the
+        // row marker for that (see #6623)
+        for (Map.Entry<ByteBuffer, RowCondition> entry : conditions.entrySet())
+            slices[i++] = new ColumnSlice(entry.getKey(), entry.getValue().rowPrefix.buildAsEndOfRange());
+
+        return new SliceQueryFilter(slices, false, slices.length, cfm.clusteringKeyColumns().size());
+    }
+
+    public boolean appliesTo(ColumnFamily current) throws InvalidRequestException
+    {
+        for (RowCondition condition : conditions.values())
+        {
+            if (!condition.appliesTo(current))
+                return false;
+        }
+        return true;
+    }
+
+    private static abstract class RowCondition
+    {
+        public final ColumnNameBuilder rowPrefix;
+        protected final long now;
+
+        protected RowCondition(ColumnNameBuilder rowPrefix, long now)
+        {
+            this.rowPrefix = rowPrefix;
+            this.now = now;
+        }
+
+        public abstract boolean appliesTo(ColumnFamily current) throws InvalidRequestException;
+    }
+
+    private static class NotExistCondition extends RowCondition
+    {
+        private NotExistCondition(ColumnNameBuilder rowPrefix, long now)
+        {
+            super(rowPrefix, now);
+        }
+
+        public boolean appliesTo(ColumnFamily current)
+        {
+            if (current == null)
+                return true;
+
+            Iterator<Column> iter = current.iterator(new ColumnSlice[]{ new ColumnSlice(rowPrefix.build(), rowPrefix.buildAsEndOfRange()) });
+            while (iter.hasNext())
+                if (iter.next().isLive(now))
+                    return false;
+            return true;
+        }
+    }
+
+    private static class ColumnsConditions extends RowCondition
+    {
+        private final Map<ColumnIdentifier, ColumnCondition> conditions = new HashMap<>();
+
+        private ColumnsConditions(ColumnNameBuilder rowPrefix, long now)
+        {
+            super(rowPrefix, now);
+        }
+
+        public void addConditions(Collection<ColumnCondition> conds, List<ByteBuffer> variables) throws InvalidRequestException
+        {
+            for (ColumnCondition condition : conds)
+            {
+                // We will need the variables in appliesTo but with protocol batches, each condition in this object can have a
+                // different list of variables. So attach them to the condition directly, it's not particulary elegant but its simpler
+                ColumnCondition previous = conditions.put(condition.column.name, condition.attach(variables));
+                // If 2 conditions are actually equal, let it slide
+                if (previous != null && !previous.equalsTo(condition))
+                    throw new InvalidRequestException("Duplicate and incompatible conditions for column " + condition.column.name);
+            }
+        }
+
+        public boolean appliesTo(ColumnFamily current) throws InvalidRequestException
+        {
+            if (current == null)
+                return conditions.isEmpty();
+
+            for (ColumnCondition condition : conditions.values())
+                if (!condition.appliesTo(rowPrefix, current, now))
+                    return false;
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java b/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
index 8974523..5c3fcb9 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
@@ -32,10 +32,12 @@ public class ColumnGroupMap
 {
     private final ByteBuffer[] fullPath;
     private final Map<ByteBuffer, Value> map = new HashMap<ByteBuffer, Value>();
+    public final boolean isStatic; // Whether or not the group correspond to "static" cells
 
-    private ColumnGroupMap(ByteBuffer[] fullPath)
+    private ColumnGroupMap(ByteBuffer[] fullPath, boolean isStatic)
     {
         this.fullPath = fullPath;
+        this.isStatic = isStatic;
     }
 
     private void add(ByteBuffer[] fullName, int idx, Column column)
@@ -126,7 +128,7 @@ public class ColumnGroupMap
 
             if (currentGroup == null)
             {
-                currentGroup = new ColumnGroupMap(current);
+                currentGroup = new ColumnGroupMap(current, composite.isStaticName(c.name()));
                 currentGroup.add(current, idx, c);
                 previous = current;
                 return;
@@ -135,7 +137,8 @@ public class ColumnGroupMap
             if (!isSameGroup(current))
             {
                 groups.add(currentGroup);
-                currentGroup = new ColumnGroupMap(current);
+                // Note that we know that only the first group built can be static
+                currentGroup = new ColumnGroupMap(current, false);
             }
             currentGroup.add(current, idx, c);
             previous = current;
@@ -167,5 +170,25 @@ public class ColumnGroupMap
             }
             return groups;
         }
+
+        public boolean isEmpty()
+        {
+            return currentGroup == null && groups.isEmpty();
+        }
+
+        public ColumnGroupMap firstGroup()
+        {
+            if (currentGroup != null)
+            {
+                groups.add(currentGroup);
+                currentGroup = null;
+            }
+            return groups.get(0);
+        }
+
+        public void discardFirst()
+        {
+            groups.remove(0);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index 3ef6f5a..376fa4a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -87,6 +87,15 @@ public class CreateIndexStatement extends SchemaAlteringStatement
         if (cfm.getCfDef().isCompact && cd.type != ColumnDefinition.Type.REGULAR)
             throw new InvalidRequestException(String.format("Secondary index on %s column %s is not yet supported for compact table", cd.type, columnName));
 
+        // It would be possible to support 2ndary index on static columns (but not without modifications of at least ExtendedFilter and
+        // CompositesIndex) and maybe we should, but that means a query like:
+        //     SELECT * FROM foo WHERE static_column = 'bar'
+        // would pull the full partition every time the static column of partition is 'bar', which sounds like offering a
+        // fair potential for foot-shooting, so I prefer leaving that to a follow up ticket once we have identified cases where
+        // such indexing is actually useful.
+        if (cd.type == ColumnDefinition.Type.STATIC)
+            throw new InvalidRequestException("Secondary indexes are not allowed on static columns");
+
         if (cd.getValidator().isCollection() && !properties.isCustom)
             throw new InvalidRequestException("Indexes on collections are no yet supported");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
index 74f7570..632194c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
@@ -52,14 +52,16 @@ public class CreateTableStatement extends SchemaAlteringStatement
     private ByteBuffer valueAlias;
 
     private final Map<ColumnIdentifier, AbstractType> columns = new HashMap<ColumnIdentifier, AbstractType>();
+    private final Set<ColumnIdentifier> staticColumns;
     private final CFPropDefs properties;
     private final boolean ifNotExists;
 
-    public CreateTableStatement(CFName name, CFPropDefs properties, boolean ifNotExists)
+    public CreateTableStatement(CFName name, CFPropDefs properties, boolean ifNotExists, Set<ColumnIdentifier> staticColumns)
     {
         super(name);
         this.properties = properties;
         this.ifNotExists = ifNotExists;
+        this.staticColumns = staticColumns;
 
         try
         {
@@ -101,7 +103,10 @@ public class CreateTableStatement extends SchemaAlteringStatement
 
         for (Map.Entry<ColumnIdentifier, AbstractType> col : columns.entrySet())
         {
-            columnDefs.put(col.getKey().key, ColumnDefinition.regularDef(col.getKey().key, col.getValue(), componentIndex));
+            ColumnIdentifier id = col.getKey();
+            columnDefs.put(id.key, staticColumns.contains(id)
+                                   ? ColumnDefinition.staticDef(id.key, col.getValue(), componentIndex)
+                                   : ColumnDefinition.regularDef(id.key, col.getValue(), componentIndex));
         }
 
         return columnDefs;
@@ -166,6 +171,7 @@ public class CreateTableStatement extends SchemaAlteringStatement
         private final List<List<ColumnIdentifier>> keyAliases = new ArrayList<List<ColumnIdentifier>>();
         private final List<ColumnIdentifier> columnAliases = new ArrayList<ColumnIdentifier>();
         private final Map<ColumnIdentifier, Boolean> definedOrdering = new LinkedHashMap<ColumnIdentifier, Boolean>(); // Insertion ordering is important
+        private final Set<ColumnIdentifier> staticColumns = new HashSet<ColumnIdentifier>();
 
         private boolean useCompactStorage;
         private final Multiset<ColumnIdentifier> definedNames = HashMultiset.create(1);
@@ -195,7 +201,7 @@ public class CreateTableStatement extends SchemaAlteringStatement
 
             properties.validate();
 
-            CreateTableStatement stmt = new CreateTableStatement(cfName, properties, ifNotExists);
+            CreateTableStatement stmt = new CreateTableStatement(cfName, properties, ifNotExists, staticColumns);
 
             Map<ByteBuffer, CollectionType> definedCollections = null;
             for (Map.Entry<ColumnIdentifier, CQL3Type> entry : definitions.entrySet())
@@ -225,6 +231,8 @@ public class CreateTableStatement extends SchemaAlteringStatement
                 AbstractType<?> t = getTypeAndRemove(stmt.columns, alias);
                 if (t instanceof CounterColumnType)
                     throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", alias));
+                if (staticColumns.contains(alias))
+                    throw new InvalidRequestException(String.format("Static column %s cannot be part of the PRIMARY KEY", alias));
                 keyTypes.add(t);
             }
             stmt.keyValidator = keyTypes.size() == 1 ? keyTypes.get(0) : CompositeType.getInstance(keyTypes);
@@ -260,10 +268,13 @@ public class CreateTableStatement extends SchemaAlteringStatement
                 {
                     if (definedCollections != null)
                         throw new InvalidRequestException("Collection types are not supported with COMPACT STORAGE");
-                    stmt.columnAliases.add(columnAliases.get(0).key);
-                    stmt.comparator = getTypeAndRemove(stmt.columns, columnAliases.get(0));
+                    ColumnIdentifier alias = columnAliases.get(0);
+                    stmt.columnAliases.add(alias.key);
+                    stmt.comparator = getTypeAndRemove(stmt.columns, alias);
                     if (stmt.comparator instanceof CounterColumnType)
-                        throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", stmt.columnAliases.get(0)));
+                        throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", alias));
+                    if (staticColumns.contains(alias))
+                        throw new InvalidRequestException(String.format("Static column %s cannot be part of the PRIMARY KEY", alias));
                 }
                 else
                 {
@@ -275,6 +286,8 @@ public class CreateTableStatement extends SchemaAlteringStatement
                         AbstractType<?> type = getTypeAndRemove(stmt.columns, t);
                         if (type instanceof CounterColumnType)
                             throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", t.key));
+                        if (staticColumns.contains(t))
+                            throw new InvalidRequestException(String.format("Static column %s cannot be part of the PRIMARY KEY", t));
                         types.add(type);
                     }
 
@@ -298,6 +311,16 @@ public class CreateTableStatement extends SchemaAlteringStatement
                 }
             }
 
+            if (!staticColumns.isEmpty())
+            {
+                // Only CQL3 tables can have static columns
+                if (useCompactStorage)
+                    throw new InvalidRequestException("Static columns are not supported in COMPACT STORAGE tables");
+                // Static columns only make sense if we have at least one clustering column. Otherwise everything is static anyway
+                if (columnAliases.isEmpty())
+                    throw new InvalidRequestException("Static columns are only useful (and thus allowed) if the table has at least one clustering column");
+            }
+
             if (useCompactStorage && !stmt.columnAliases.isEmpty())
             {
                 if (stmt.columns.isEmpty())
@@ -373,10 +396,12 @@ public class CreateTableStatement extends SchemaAlteringStatement
             return isReversed != null && isReversed ? ReversedType.getInstance(type) : type;
         }
 
-        public void addDefinition(ColumnIdentifier def, CQL3Type type)
+        public void addDefinition(ColumnIdentifier def, CQL3Type type, boolean isStatic)
         {
             definedNames.add(def);
             definitions.put(def, type);
+            if (isStatic)
+                staticColumns.add(def);
         }
 
         public void addKeyAliases(List<ColumnIdentifier> aliases)