You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/02/20 17:51:29 UTC

[2/6] git commit: Add static columns in CQL3

Add static columns in CQL3

patch by slebresne; reviewed by iamaleksey for CASSANDRA-6561


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b09d8769
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b09d8769
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b09d8769

Branch: refs/heads/cassandra-2.1
Commit: b09d876914ad9c9fdf1af35cf48cdb98c27bbf32
Parents: b2b3055
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jan 9 18:44:21 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Feb 20 10:15:02 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 doc/cql3/CQL.textile                            |  29 +-
 .../org/apache/cassandra/config/CFMetaData.java |  35 +-
 .../cassandra/config/ColumnDefinition.java      |   9 +-
 .../org/apache/cassandra/cql3/CFDefinition.java | 123 +++++--
 .../apache/cassandra/cql3/ColumnCondition.java  | 191 +++++++++++
 .../org/apache/cassandra/cql3/Constants.java    |   5 +-
 src/java/org/apache/cassandra/cql3/Cql.g        |  29 +-
 src/java/org/apache/cassandra/cql3/Lists.java   |   7 +-
 src/java/org/apache/cassandra/cql3/Maps.java    |   8 +-
 .../org/apache/cassandra/cql3/Operation.java    |  19 ++
 src/java/org/apache/cassandra/cql3/Sets.java    |   6 +-
 .../cassandra/cql3/functions/TokenFct.java      |   4 +-
 .../cql3/statements/AlterTableStatement.java    |  32 +-
 .../cql3/statements/BatchStatement.java         | 158 +++++++--
 .../cql3/statements/CQL3CasConditions.java      | 164 +++++++++
 .../cql3/statements/ColumnGroupMap.java         |  29 +-
 .../cql3/statements/CreateIndexStatement.java   |   9 +
 .../cql3/statements/CreateTableStatement.java   |  39 ++-
 .../cql3/statements/DeleteStatement.java        |  33 +-
 .../cql3/statements/ModificationStatement.java  | 331 +++++++++++--------
 .../cql3/statements/SelectStatement.java        | 258 ++++++++++++---
 .../cassandra/cql3/statements/Selection.java    |  33 +-
 .../cql3/statements/UpdateStatement.java        |  20 +-
 .../apache/cassandra/db/filter/ColumnSlice.java |   1 -
 .../db/index/composites/CompositesSearcher.java |  32 +-
 .../db/marshal/AbstractCompositeType.java       |  47 +--
 .../cassandra/db/marshal/CompositeType.java     |  72 +++-
 .../db/marshal/DynamicCompositeType.java        |   6 +
 .../hadoop/pig/AbstractCassandraStorage.java    |  11 +-
 .../apache/cassandra/hadoop/pig/CqlStorage.java |   8 +-
 .../apache/cassandra/service/CASConditions.java |   3 +-
 .../cassandra/thrift/ThriftValidation.java      |   3 +-
 33 files changed, 1384 insertions(+), 371 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4fffb9a..bbacc4d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -18,6 +18,7 @@
  * Fix count(*) queries in a mixed cluster (CASSANDRA-6707)
  * Improve repair tasks(snapshot, differencing) concurrency (CASSANDRA-6566)
  * Fix replaying pre-2.0 commit logs (CASSANDRA-6714)
+ * Add static columns to CQL3 (CASSANDRA-6561)
 Merged from 1.2:
  * Catch memtable flush exceptions during shutdown (CASSANDRA-6735)
  * Fix broken streams when replacing with same IP (CASSANDRA-6622)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/doc/cql3/CQL.textile
----------------------------------------------------------------------
diff --git a/doc/cql3/CQL.textile b/doc/cql3/CQL.textile
index 03b95e0..d872bde 100644
--- a/doc/cql3/CQL.textile
+++ b/doc/cql3/CQL.textile
@@ -219,7 +219,7 @@ bc(syntax)..
                           '(' <definition> ( ',' <definition> )* ')'
                           ( WITH <option> ( AND <option>)* )?
 
-<column-definition> ::= <identifier> <type> ( PRIMARY KEY )?
+<column-definition> ::= <identifier> <type> ( STATIC )? ( PRIMARY KEY )?
                       | PRIMARY KEY '(' <partition-key> ( ',' <identifier> )* ')'
 
 <partition-key> ::= <partition-key>
@@ -288,11 +288,35 @@ In CQL, the order in which columns are defined for the @PRIMARY KEY@ matters. Th
 
 The remaining columns of the @PRIMARY KEY@ definition, if any, are called __clustering columns. On a given physical node, rows for a given partition key are stored in the order induced by the clustering columns, making the retrieval of rows in that clustering order particularly efficient (see <a href="#selectStmt"><tt>SELECT</tt></a>).
 
+h4(#createTableStatic). @STATIC@ columns
+
+Some columns can be declared as @STATIC@ in a table definition. A column that is static will be "shared" by all the rows belonging to the same partition (having the same partition key). For instance, in:
+
+bc(sample). 
+CREATE TABLE test (
+    pk int,
+    t int,
+    v text,
+    s text static,
+    PRIMARY KEY (pk, t)
+);
+INSERT INTO test(pk, t, v, s) VALUES (0, 0, 'val0', 'static0');
+INSERT INTO test(pk, t, v, s) VALUES (0, 1, 'val1', 'static1');
+SELECT * FROM test WHERE pk=0 AND t=0;
+
+the last query will return @'static1'@ as value for @s@, since @s@ is static and thus the 2nd insertion modified this "shared" value. Note however that static columns are only static within a given partition, and if in the example above both rows where from different partitions (i.e. if they had different value for @pk@), then the 2nd insertion would not have modified the value of @s@ for the first row.
+
+A few restrictions applies to when static columns are allowed:
+* tables with the @COMPACT STORAGE@ option (see below) cannot have them
+* a table without clustering columns cannot have static columns (in a table without clustering columns, every partition has only one row, and so every column is inherently static).
+* only non @PRIMARY KEY@ columns can be static
+
+
 h4(#createTableOptions). @<option>@
 
 The @CREATE TABLE@ statement supports a number of options that controls the configuration of a new table. These options can be specified after the @WITH@ keyword.
 
-The first of these option is @COMPACT STORAGE@. This option is mainly targeted towards backward compatibility for definitions created before CQL3 (see "www.datastax.com/dev/blog/thrift-to-cql3":http://www.datastax.com/dev/blog/thrift-to-cql3 for more details).  The option also provides a slightly more compact layout of data on disk but at the price of diminished flexibility and extensibility for the table.  Most notably, @COMPACT STORAGE@ tables cannot have collections and a @COMPACT STORAGE@ table with at least one clustering column supports exactly one (as in not 0 nor more than 1) column not part of the @PRIMARY KEY@ definition (which imply in particular that you cannot add nor remove columns after creation). For those reasons, @COMPACT STORAGE@ is not recommended outside of the backward compatibility reason evoked above.
+The first of these option is @COMPACT STORAGE@. This option is mainly targeted towards backward compatibility for definitions created before CQL3 (see "www.datastax.com/dev/blog/thrift-to-cql3":http://www.datastax.com/dev/blog/thrift-to-cql3 for more details).  The option also provides a slightly more compact layout of data on disk but at the price of diminished flexibility and extensibility for the table.  Most notably, @COMPACT STORAGE@ tables cannot have collections nor static columns and a @COMPACT STORAGE@ table with at least one clustering column supports exactly one (as in not 0 nor more than 1) column not part of the @PRIMARY KEY@ definition (which imply in particular that you cannot add nor remove columns after creation). For those reasons, @COMPACT STORAGE@ is not recommended outside of the backward compatibility reason evoked above.
 
 Another option is @CLUSTERING ORDER@. It allows to define the ordering of rows on disk. It takes the list of the clustering column names with, for each of them, the on-disk order (Ascending or descending). Note that this option affects "what @ORDER BY@ are allowed during @SELECT@":#selectOrderBy.
 
@@ -1116,6 +1140,7 @@ The following describes the addition/changes brought for each version of CQL.
 h3. 3.1.5
 
 * It is now possible to group clustering columns in a relatiion, see "SELECT Where clauses":#selectWhere.
+* Added support for @STATIC@ columns, see "static in CREATE TABLE":#createTableStatic.
 
 h3. 3.1.4
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 714a8bc..a319930 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -27,6 +27,7 @@ import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.MapDifference;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.ArrayUtils;
@@ -408,6 +409,7 @@ public final class CFMetaData
     private volatile List<ColumnDefinition> partitionKeyColumns;  // Always of size keyValidator.componentsCount, null padded if necessary
     private volatile List<ColumnDefinition> clusteringKeyColumns; // Of size comparator.componentsCount or comparator.componentsCount -1, null padded if necessary
     private volatile Set<ColumnDefinition> regularColumns;
+    private volatile Set<ColumnDefinition> staticColumns;
     private volatile ColumnDefinition compactValueColumn;
 
     public volatile Class<? extends AbstractCompactionStrategy> compactionStrategyClass = DEFAULT_COMPACTION_STRATEGY_CLASS;
@@ -721,6 +723,16 @@ public final class CFMetaData
         return regularColumns;
     }
 
+    public Set<ColumnDefinition> staticColumns()
+    {
+        return staticColumns;
+    }
+
+    public Iterable<ColumnDefinition> regularAndStaticColumns()
+    {
+        return Iterables.concat(staticColumns, regularColumns);
+    }
+
     public ColumnDefinition compactValueColumn()
     {
         return compactValueColumn;
@@ -1328,7 +1340,7 @@ public final class CFMetaData
         // Mixing counter with non counter columns is not supported (#2614)
         if (defaultValidator instanceof CounterColumnType)
         {
-            for (ColumnDefinition def : regularColumns)
+            for (ColumnDefinition def : regularAndStaticColumns())
                 if (!(def.getValidator() instanceof CounterColumnType))
                     throw new ConfigurationException("Cannot add a non counter column (" + getColumnDefinitionComparator(def).getString(def.name) + ") in a counter column family");
         }
@@ -1839,7 +1851,7 @@ public final class CFMetaData
         if (column_metadata.get(to) != null)
             throw new InvalidRequestException(String.format("Cannot rename column %s to %s in keyspace %s; another column of that name already exist", strFrom, strTo, cfName));
 
-        if (def.type == ColumnDefinition.Type.REGULAR)
+        if (def.type == ColumnDefinition.Type.REGULAR || def.type == ColumnDefinition.Type.STATIC)
         {
             throw new InvalidRequestException(String.format("Cannot rename non PRIMARY KEY part %s", strFrom));
         }
@@ -1883,6 +1895,7 @@ public final class CFMetaData
                      : comparator.componentsCount() - (hasCollection() ? 2 : 1);
         List<ColumnDefinition> ckCols = nullInitializedList(nbCkCols);
         Set<ColumnDefinition> regCols = new HashSet<ColumnDefinition>();
+        Set<ColumnDefinition> statCols = new HashSet<ColumnDefinition>();
         ColumnDefinition compactCol = null;
 
         for (ColumnDefinition def : column_metadata.values())
@@ -1900,6 +1913,9 @@ public final class CFMetaData
                 case REGULAR:
                     regCols.add(def);
                     break;
+                case STATIC:
+                    statCols.add(def);
+                    break;
                 case COMPACT_VALUE:
                     assert compactCol == null : "There shouldn't be more than one compact value defined: got " + compactCol + " and " + def;
                     compactCol = def;
@@ -1911,6 +1927,7 @@ public final class CFMetaData
         partitionKeyColumns = addDefaultKeyAliases(pkCols);
         clusteringKeyColumns = addDefaultColumnAliases(ckCols);
         regularColumns = regCols;
+        staticColumns = statCols;
         compactValueColumn = addDefaultValueAlias(compactCol, isDense);
     }
 
@@ -2074,6 +2091,20 @@ public final class CFMetaData
         return true;
     }
 
+    public boolean hasStaticColumns()
+    {
+        return !staticColumns.isEmpty();
+    }
+
+    public ColumnNameBuilder getStaticColumnNameBuilder()
+    {
+        assert comparator instanceof CompositeType && clusteringKeyColumns().size() > 0;
+        CompositeType.Builder builder = CompositeType.Builder.staticBuilder((CompositeType)comparator);
+        for (int i = 0; i < clusteringKeyColumns().size(); i++)
+            builder.add(ByteBufferUtil.EMPTY_BYTE_BUFFER);
+        return builder;
+    }
+
     public void validateColumns(Iterable<Column> columns)
     {
         for (Column column : columns)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index 7ca4d45..11340e7 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -64,7 +64,8 @@ public class ColumnDefinition
         PARTITION_KEY,
         CLUSTERING_KEY,
         REGULAR,
-        COMPACT_VALUE
+        COMPACT_VALUE,
+        STATIC
     }
 
     public final ByteBuffer name;
@@ -96,6 +97,11 @@ public class ColumnDefinition
         return new ColumnDefinition(name, validator, componentIndex, Type.REGULAR);
     }
 
+    public static ColumnDefinition staticDef(ByteBuffer name, AbstractType<?> validator, Integer componentIndex)
+    {
+        return new ColumnDefinition(name, validator, componentIndex, Type.STATIC);
+    }
+
     public static ColumnDefinition compactValueDef(ByteBuffer name, AbstractType<?> validator)
     {
         return new ColumnDefinition(name, validator, null, Type.COMPACT_VALUE);
@@ -174,6 +180,7 @@ public class ColumnDefinition
 
     public boolean isThriftCompatible()
     {
+        // componentIndex == null should always imply isStatic in practice, but there is no harm in being too careful here.
         return type == ColumnDefinition.Type.REGULAR && componentIndex == null;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/CFDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CFDefinition.java b/src/java/org/apache/cassandra/cql3/CFDefinition.java
index 638770d..b589a95 100644
--- a/src/java/org/apache/cassandra/cql3/CFDefinition.java
+++ b/src/java/org/apache/cassandra/cql3/CFDefinition.java
@@ -42,11 +42,12 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
 
     public final CFMetaData cfm;
     // LinkedHashMap because the order does matter (it is the order in the composite type)
-    public final LinkedHashMap<ColumnIdentifier, Name> keys = new LinkedHashMap<ColumnIdentifier, Name>();
-    public final LinkedHashMap<ColumnIdentifier, Name> columns = new LinkedHashMap<ColumnIdentifier, Name>();
-    public final Name value;
+    private final LinkedHashMap<ColumnIdentifier, Name> partitionKeys = new LinkedHashMap<ColumnIdentifier, Name>();
+    private final LinkedHashMap<ColumnIdentifier, Name> clusteringColumns = new LinkedHashMap<ColumnIdentifier, Name>();
+    private final Name compactValue;
     // Keep metadata lexicographically ordered so that wildcard expansion have a deterministic order
-    public final Map<ColumnIdentifier, Name> metadata = new TreeMap<ColumnIdentifier, Name>();
+    private final Map<ColumnIdentifier, Name> staticColumns = new TreeMap<ColumnIdentifier, Name>();
+    private final Map<ColumnIdentifier, Name> regularColumns = new TreeMap<ColumnIdentifier, Name>();
 
     public final boolean isComposite;
     public final boolean hasCompositeKey;
@@ -65,7 +66,7 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
         for (int i = 0; i < cfm.partitionKeyColumns().size(); ++i)
         {
             ColumnIdentifier id = new ColumnIdentifier(cfm.partitionKeyColumns().get(i).name, definitionType);
-            this.keys.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.KEY_ALIAS, i, cfm.getKeyValidator().getComponents().get(i)));
+            this.partitionKeys.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.KEY_ALIAS, i, cfm.getKeyValidator().getComponents().get(i)));
         }
 
         this.isComposite = cfm.comparator instanceof CompositeType;
@@ -74,20 +75,25 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
         for (int i = 0; i < cfm.clusteringKeyColumns().size(); ++i)
         {
             ColumnIdentifier id = new ColumnIdentifier(cfm.clusteringKeyColumns().get(i).name, definitionType);
-            this.columns.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.COLUMN_ALIAS, i, cfm.comparator.getComponents().get(i)));
+            this.clusteringColumns.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.COLUMN_ALIAS, i, cfm.comparator.getComponents().get(i)));
         }
 
         if (isCompact)
         {
-            this.value = createValue(cfm);
+            this.compactValue = createValue(cfm);
         }
         else
         {
-            this.value = null;
+            this.compactValue = null;
             for (ColumnDefinition def : cfm.regularColumns())
             {
                 ColumnIdentifier id = new ColumnIdentifier(def.name, cfm.getColumnDefinitionComparator(def));
-                this.metadata.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.COLUMN_METADATA, def.getValidator()));
+                this.regularColumns.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.COLUMN_METADATA, def.getValidator()));
+            }
+            for (ColumnDefinition def : cfm.staticColumns())
+            {
+                ColumnIdentifier id = new ColumnIdentifier(def.name, cfm.getColumnDefinitionComparator(def));
+                this.staticColumns.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.STATIC, def.getValidator()));
             }
         }
     }
@@ -111,44 +117,86 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
                : new Name(cfm.ksName, cfm.cfName, alias, Name.Kind.VALUE_ALIAS, cfm.getDefaultValidator());
     }
 
+    public int partitionKeyCount()
+    {
+        return partitionKeys.size();
+    }
+
+    public Collection<Name> partitionKeys()
+    {
+        return partitionKeys.values();
+    }
+
+    public int clusteringColumnsCount()
+    {
+        return clusteringColumns.size();
+    }
+
+    public Collection<Name> clusteringColumns()
+    {
+        return clusteringColumns.values();
+    }
+
+    public Collection<Name> regularColumns()
+    {
+        return regularColumns.values();
+    }
+
+    public Collection<Name> staticColumns()
+    {
+        return regularColumns.values();
+    }
+
+    public Name compactValue()
+    {
+        return compactValue;
+    }
+
     public Name get(ColumnIdentifier name)
     {
-        CFDefinition.Name kdef = keys.get(name);
-        if (kdef != null)
-            return kdef;
-        if (value != null && name.equals(value.name))
-            return value;
-        CFDefinition.Name def = columns.get(name);
+        CFDefinition.Name def = partitionKeys.get(name);
         if (def != null)
             return def;
-        return metadata.get(name);
+        if (compactValue != null && name.equals(compactValue.name))
+            return compactValue;
+        def = clusteringColumns.get(name);
+        if (def != null)
+            return def;
+        def = regularColumns.get(name);
+        if (def != null)
+            return def;
+        return staticColumns.get(name);
     }
 
     public Iterator<Name> iterator()
     {
         return new AbstractIterator<Name>()
         {
-            private final Iterator<Name> keyIter = keys.values().iterator();
-            private final Iterator<Name> columnIter = columns.values().iterator();
+            private final Iterator<Name> keyIter = partitionKeys.values().iterator();
+            private final Iterator<Name> clusteringIter = clusteringColumns.values().iterator();
             private boolean valueDone;
-            private final Iterator<Name> metadataIter = metadata.values().iterator();
+            private final Iterator<Name> staticIter = staticColumns.values().iterator();
+            private final Iterator<Name> regularIter = regularColumns.values().iterator();
 
             protected Name computeNext()
             {
                 if (keyIter.hasNext())
                     return keyIter.next();
 
-                if (columnIter.hasNext())
-                    return columnIter.next();
+                if (clusteringIter.hasNext())
+                    return clusteringIter.next();
 
-                if (value != null && !valueDone)
+                if (compactValue != null && !valueDone)
                 {
                     valueDone = true;
-                    return value;
+                    return compactValue;
                 }
 
-                if (metadataIter.hasNext())
-                    return metadataIter.next();
+                if (staticIter.hasNext())
+                    return staticIter.next();
+
+                if (regularIter.hasNext())
+                    return regularIter.next();
 
                 return endOfData();
             }
@@ -173,7 +221,7 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
     {
         public static enum Kind
         {
-            KEY_ALIAS, COLUMN_ALIAS, VALUE_ALIAS, COLUMN_METADATA
+            KEY_ALIAS, COLUMN_ALIAS, VALUE_ALIAS, COLUMN_METADATA, STATIC
         }
 
         private Name(String ksName, String cfName, ColumnIdentifier name, Kind kind, AbstractType<?> type)
@@ -210,20 +258,29 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
         {
             return Objects.hashCode(ksName, cfName, name, type, kind, position);
         }
+
+        public boolean isPrimaryKeyColumn()
+        {
+            return kind == Kind.KEY_ALIAS || kind == Kind.COLUMN_ALIAS;
+        }
     }
 
     @Override
     public String toString()
     {
         StringBuilder sb = new StringBuilder();
-        sb.append(Joiner.on(", ").join(keys.values()));
-        if (!columns.isEmpty())
-            sb.append(", ").append(Joiner.on(", ").join(columns.values()));
+        sb.append(Joiner.on(", ").join(partitionKeys.values()));
+        if (!clusteringColumns.isEmpty())
+            sb.append(", ").append(Joiner.on(", ").join(clusteringColumns.values()));
         sb.append(" => ");
-        if (value != null)
-            sb.append(value.name);
-        if (!metadata.isEmpty())
-            sb.append("{").append(Joiner.on(", ").join(metadata.values())).append(" }");
+        if (compactValue != null)
+            sb.append(compactValue.name);
+        sb.append("{");
+        sb.append(Joiner.on(", ").join(staticColumns.values()));
+        if (!staticColumns.isEmpty())
+            sb.append(", ");
+        sb.append(Joiner.on(", ").join(regularColumns.values()));
+        sb.append("}");
         return sb.toString();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/ColumnCondition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ColumnCondition.java b/src/java/org/apache/cassandra/cql3/ColumnCondition.java
new file mode 100644
index 0000000..797dba6
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/ColumnCondition.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+/**
+ * A CQL3 condition.
+ */
+public class ColumnCondition
+{
+    public final CFDefinition.Name column;
+    private final Term value;
+
+    private List<ByteBuffer> variables;
+
+    private ColumnCondition(CFDefinition.Name column, Term value)
+    {
+        this.column = column;
+        this.value = value;
+    }
+
+    // The only ones we support so far
+    public static ColumnCondition equal(CFDefinition.Name column, Term value)
+    {
+        return new ColumnCondition(column, value);
+    }
+
+    // See CQL3CasConditions for why it's convenient to have this
+    public ColumnCondition attach(List<ByteBuffer> variables)
+    {
+        this.variables = variables;
+        return this;
+    }
+
+    /**
+     * Collects the column specification for the bind variables of this operation.
+     *
+     * @param boundNames the list of column specification where to collect the
+     * bind variables of this term in.
+     */
+    public void collectMarkerSpecification(VariableSpecifications boundNames)
+    {
+        value.collectMarkerSpecification(boundNames);
+    }
+
+    // Not overriding equals() because we need the variables to have been attached when this is
+    // called and so having a non standard method name might help avoid mistakes
+    public boolean equalsTo(ColumnCondition other) throws InvalidRequestException
+    {
+        return column.equals(other.column)
+            && value.bindAndGet(variables).equals(other.value.bindAndGet(other.variables));
+    }
+
+    private ColumnNameBuilder copyOrUpdatePrefix(CFMetaData cfm, ColumnNameBuilder rowPrefix)
+    {
+        return column.kind == CFDefinition.Name.Kind.STATIC ? cfm.getStaticColumnNameBuilder() : rowPrefix.copy();
+    }
+
+    /**
+     * Validates whether this condition applies to {@code current}.
+     */
+    public boolean appliesTo(ColumnNameBuilder rowPrefix, ColumnFamily current, long now) throws InvalidRequestException
+    {
+        if (column.type instanceof CollectionType)
+            return collectionAppliesTo((CollectionType)column.type, rowPrefix, current, now);
+
+        Column c = current.getColumn(copyOrUpdatePrefix(current.metadata(), rowPrefix).add(column.name.key).build());
+        ByteBuffer v = value.bindAndGet(variables);
+        return v == null
+             ? c == null || !c.isLive(now)
+             : c != null && c.isLive(now) && c.value().equals(v);
+    }
+
+    private boolean collectionAppliesTo(CollectionType type, ColumnNameBuilder rowPrefix, ColumnFamily current, final long now) throws InvalidRequestException
+    {
+        ColumnNameBuilder collectionPrefix = copyOrUpdatePrefix(current.metadata(), rowPrefix).add(column.name.key);
+        // We are testing for collection equality, so we need to have the expected values *and* only those.
+        ColumnSlice[] collectionSlice = new ColumnSlice[]{ new ColumnSlice(collectionPrefix.build(), collectionPrefix.buildAsEndOfRange()) };
+        // Filter live columns, this makes things simpler afterwards
+        Iterator<Column> iter = Iterators.filter(current.iterator(collectionSlice), new Predicate<Column>()
+        {
+            public boolean apply(Column c)
+            {
+                // we only care about live columns
+                return c.isLive(now);
+            }
+        });
+
+        Term.Terminal v = value.bind(variables);
+        if (v == null)
+            return !iter.hasNext();
+
+        switch (type.kind)
+        {
+            case LIST: return listAppliesTo(current.metadata(), iter, ((Lists.Value)v).elements);
+            case SET: return setAppliesTo(current.metadata(), iter, ((Sets.Value)v).elements);
+            case MAP: return mapAppliesTo(current.metadata(), iter, ((Maps.Value)v).map);
+        }
+        throw new AssertionError();
+    }
+
+    private static ByteBuffer collectionKey(CFMetaData cfm, Column c)
+    {
+        ByteBuffer[] bbs = ((CompositeType)cfm.comparator).split(c.name());
+        return bbs[bbs.length - 1];
+    }
+
+    private boolean listAppliesTo(CFMetaData cfm, Iterator<Column> iter, List<ByteBuffer> elements)
+    {
+        for (ByteBuffer e : elements)
+            if (!iter.hasNext() || iter.next().value().equals(e))
+                return false;
+        // We must not have more elements than expected
+        return !iter.hasNext();
+    }
+
+    private boolean setAppliesTo(CFMetaData cfm, Iterator<Column> iter, Set<ByteBuffer> elements)
+    {
+        Set<ByteBuffer> remaining = new HashSet<>(elements);
+        while (iter.hasNext())
+        {
+            if (remaining.isEmpty())
+                return false;
+
+            if (!remaining.remove(collectionKey(cfm, iter.next())))
+                return false;
+        }
+        return remaining.isEmpty();
+    }
+
+    private boolean mapAppliesTo(CFMetaData cfm, Iterator<Column> iter, Map<ByteBuffer, ByteBuffer> elements)
+    {
+        Map<ByteBuffer, ByteBuffer> remaining = new HashMap<>(elements);
+        while (iter.hasNext())
+        {
+            if (remaining.isEmpty())
+                return false;
+
+            Column c = iter.next();
+            if (!remaining.remove(collectionKey(cfm, c)).equals(c.value()))
+                return false;
+        }
+        return remaining.isEmpty();
+    }
+
+    public static class Raw
+    {
+        private final Term.Raw value;
+
+        public Raw(Term.Raw value)
+        {
+            this.value = value;
+        }
+
+        public ColumnCondition prepare(CFDefinition.Name receiver) throws InvalidRequestException
+        {
+            if (receiver.type instanceof CounterColumnType)
+                throw new InvalidRequestException("Condtions on counters are not supported");
+
+            return ColumnCondition.equal(receiver, value.prepare(receiver));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/Constants.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Constants.java b/src/java/org/apache/cassandra/cql3/Constants.java
index bcfe00d..f99fd02 100644
--- a/src/java/org/apache/cassandra/cql3/Constants.java
+++ b/src/java/org/apache/cassandra/cql3/Constants.java
@@ -296,6 +296,7 @@ public abstract class Constants
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
+            prefix = maybeUpdatePrefix(cf.metadata(), prefix);
             ByteBuffer cname = columnName == null ? prefix.build() : prefix.add(columnName.key).build();
             ByteBuffer value = t.bindAndGet(params.variables);
             cf.addColumn(value == null ? params.makeTombstone(cname) : params.makeColumn(cname, value));
@@ -315,6 +316,7 @@ public abstract class Constants
             if (bytes == null)
                 throw new InvalidRequestException("Invalid null value for counter increment");
             long increment = ByteBufferUtil.toLong(bytes);
+            prefix = maybeUpdatePrefix(cf.metadata(), prefix);
             ByteBuffer cname = columnName == null ? prefix.build() : prefix.add(columnName.key).build();
             cf.addCounter(cname, increment);
         }
@@ -337,6 +339,7 @@ public abstract class Constants
             if (increment == Long.MIN_VALUE)
                 throw new InvalidRequestException("The negation of " + increment + " overflows supported counter precision (signed 8 bytes integer)");
 
+            prefix = maybeUpdatePrefix(cf.metadata(), prefix);
             ByteBuffer cname = columnName == null ? prefix.build() : prefix.add(columnName.key).build();
             cf.addCounter(cname, -increment);
         }
@@ -356,7 +359,7 @@ public abstract class Constants
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
-            ColumnNameBuilder column = prefix.add(columnName.key);
+            ColumnNameBuilder column = maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key);
 
             if (isCollection)
                 cf.addAtom(params.makeRangeTombstone(column.build(), column.buildAsEndOfRange()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index 6e7cf1c..a11a818 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -351,21 +351,22 @@ updateStatement returns [UpdateStatement.ParsedUpdate expr]
       ( usingClause[attrs] )?
       K_SET columnOperation[operations] (',' columnOperation[operations])*
       K_WHERE wclause=whereClause
-      ( K_IF conditions=updateCondition )?
+      ( K_IF conditions=updateConditions )?
       {
           return new UpdateStatement.ParsedUpdate(cf,
                                                   attrs,
                                                   operations,
                                                   wclause,
-                                                  conditions == null ? Collections.<Pair<ColumnIdentifier, Operation.RawUpdate>>emptyList() : conditions);
+                                                  conditions == null ? Collections.<Pair<ColumnIdentifier, ColumnCondition.Raw>>emptyList() : conditions);
      }
     ;
 
-updateCondition returns [List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions]
-    @init { conditions = new ArrayList<Pair<ColumnIdentifier, Operation.RawUpdate>>(); }
-    : columnOperation[conditions] ( K_AND columnOperation[conditions] )*
+updateConditions returns [List<Pair<ColumnIdentifier, ColumnCondition.Raw>> conditions]
+    @init { conditions = new ArrayList<Pair<ColumnIdentifier, ColumnCondition.Raw>>(); }
+    : columnCondition[conditions] ( K_AND columnCondition[conditions] )*
     ;
 
+
 /**
  * DELETE name1, name2
  * FROM <CF>
@@ -381,13 +382,13 @@ deleteStatement returns [DeleteStatement.Parsed expr]
       K_FROM cf=columnFamilyName
       ( usingClauseDelete[attrs] )?
       K_WHERE wclause=whereClause
-      ( K_IF conditions=updateCondition )?
+      ( K_IF conditions=updateConditions )?
       {
           return new DeleteStatement.Parsed(cf,
                                             attrs,
                                             columnDeletions,
                                             wclause,
-                                            conditions == null ? Collections.<Pair<ColumnIdentifier, Operation.RawUpdate>>emptyList() : conditions);
+                                            conditions == null ? Collections.<Pair<ColumnIdentifier, ColumnCondition.Raw>>emptyList() : conditions);
       }
     ;
 
@@ -484,7 +485,8 @@ cfamDefinition[CreateTableStatement.RawStatement expr]
     ;
 
 cfamColumns[CreateTableStatement.RawStatement expr]
-    : k=cident v=comparatorType { $expr.addDefinition(k, v); } (K_PRIMARY K_KEY { $expr.addKeyAliases(Collections.singletonList(k)); })?
+    : k=cident v=comparatorType { boolean isStatic=false; } (K_STATIC {isStatic = true;})? { $expr.addDefinition(k, v, isStatic); }
+        (K_PRIMARY K_KEY { $expr.addKeyAliases(Collections.singletonList(k)); })?
     | K_PRIMARY K_KEY '(' pkDef[expr] (',' c=cident { $expr.addColumnAlias(c); } )* ')'
     ;
 
@@ -558,10 +560,11 @@ alterTableStatement returns [AlterTableStatement expr]
         AlterTableStatement.Type type = null;
         CFPropDefs props = new CFPropDefs();
         Map<ColumnIdentifier, ColumnIdentifier> renames = new HashMap<ColumnIdentifier, ColumnIdentifier>();
+        boolean isStatic = false;
     }
     : K_ALTER K_COLUMNFAMILY cf=columnFamilyName
           ( K_ALTER id=cident K_TYPE v=comparatorType { type = AlterTableStatement.Type.ALTER; }
-          | K_ADD   id=cident v=comparatorType        { type = AlterTableStatement.Type.ADD; }
+          | K_ADD   id=cident v=comparatorType ({ isStatic=true; } K_STATIC)? { type = AlterTableStatement.Type.ADD; }
           | K_DROP  id=cident                         { type = AlterTableStatement.Type.DROP; }
           | K_WITH  properties[props]                 { type = AlterTableStatement.Type.OPTS; }
           | K_RENAME                                  { type = AlterTableStatement.Type.RENAME; }
@@ -569,7 +572,7 @@ alterTableStatement returns [AlterTableStatement expr]
                ( K_AND idn=cident K_TO toIdn=cident { renames.put(idn, toIdn); } )*
           )
     {
-        $expr = new AlterTableStatement(cf, type, id, v, props, renames);
+        $expr = new AlterTableStatement(cf, type, id, v, props, renames, isStatic);
     }
     ;
 
@@ -845,6 +848,10 @@ columnOperation[List<Pair<ColumnIdentifier, Operation.RawUpdate>> operations]
       }
     ;
 
+columnCondition[List<Pair<ColumnIdentifier, ColumnCondition.Raw>> conditions]
+    : key=cident '=' t=term { conditions.add(Pair.create(key, new ColumnCondition.Raw(t))); } // Note: we'll reject duplicates later
+    ;
+
 properties[PropertyDefinitions props]
     : property[props] (K_AND property[props])*
     ;
@@ -981,6 +988,7 @@ unreserved_function_keyword returns [String str]
         | K_CUSTOM
         | K_TRIGGER
         | K_DISTINCT
+        | K_STATIC
         ) { $str = $k.text; }
     | t=native_type { $str = t.toString(); }
     ;
@@ -1084,6 +1092,7 @@ K_NAN:         N A N;
 K_INFINITY:    I N F I N I T Y;
 
 K_TRIGGER:     T R I G G E R;
+K_STATIC:      S T A T I C;
 
 // Case-insensitive alpha characters
 fragment A: ('a'|'A');

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/Lists.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Lists.java b/src/java/org/apache/cassandra/cql3/Lists.java
index 4ca5eb3..4ad39db 100644
--- a/src/java/org/apache/cassandra/cql3/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/Lists.java
@@ -266,7 +266,7 @@ public abstract class Lists
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
             // delete + append
-            ColumnNameBuilder column = prefix.add(columnName.key);
+            ColumnNameBuilder column = maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key);
             cf.addAtom(params.makeTombstoneForOverwrite(column.build(), column.buildAsEndOfRange()));
             Appender.doAppend(t, cf, column, params);
         }
@@ -309,6 +309,7 @@ public abstract class Lists
                 throw new InvalidRequestException(String.format("List index %d out of bound, list has size %d", idx, existingList.size()));
 
             ByteBuffer elementName = existingList.get(idx).right.name();
+            // Since we reuse the name we're read, if it's a static column, the static marker will already be set
 
             if (value == null)
             {
@@ -336,7 +337,7 @@ public abstract class Lists
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
-            doAppend(t, cf, prefix.add(columnName.key), params);
+            doAppend(t, cf, maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key), params);
         }
 
         static void doAppend(Term t, ColumnFamily cf, ColumnNameBuilder columnName, UpdateParameters params) throws InvalidRequestException
@@ -376,7 +377,7 @@ public abstract class Lists
             long time = PrecisionTime.REFERENCE_TIME - (System.currentTimeMillis() - PrecisionTime.REFERENCE_TIME);
 
             List<ByteBuffer> toAdd = ((Lists.Value)value).elements;
-            ColumnNameBuilder column = prefix.add(columnName.key);
+            ColumnNameBuilder column = maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key);
             for (int i = 0; i < toAdd.size(); i++)
             {
                 ColumnNameBuilder b = i == toAdd.size() - 1 ? column : column.copy();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/Maps.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Maps.java b/src/java/org/apache/cassandra/cql3/Maps.java
index 30d796c..c332999 100644
--- a/src/java/org/apache/cassandra/cql3/Maps.java
+++ b/src/java/org/apache/cassandra/cql3/Maps.java
@@ -244,7 +244,7 @@ public abstract class Maps
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
             // delete + put
-            ColumnNameBuilder column = prefix.add(columnName.key);
+            ColumnNameBuilder column = maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key);
             cf.addAtom(params.makeTombstoneForOverwrite(column.build(), column.buildAsEndOfRange()));
             Putter.doPut(t, cf, column, params);
         }
@@ -274,7 +274,7 @@ public abstract class Maps
             if (key == null)
                 throw new InvalidRequestException("Invalid null map key");
 
-            ByteBuffer cellName = prefix.add(columnName.key).add(key).build();
+            ByteBuffer cellName = maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key).add(key).build();
 
             if (value == null)
             {
@@ -302,7 +302,7 @@ public abstract class Maps
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
-            doPut(t, cf, prefix.add(columnName.key), params);
+            doPut(t, cf, maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key), params);
         }
 
         static void doPut(Term t, ColumnFamily cf, ColumnNameBuilder columnName, UpdateParameters params) throws InvalidRequestException
@@ -335,7 +335,7 @@ public abstract class Maps
                 throw new InvalidRequestException("Invalid null map key");
             assert key instanceof Constants.Value;
 
-            ByteBuffer cellName = prefix.add(columnName.key).add(((Constants.Value)key).bytes).build();
+            ByteBuffer cellName = maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key).add(((Constants.Value)key).bytes).build();
             cf.addColumn(params.makeTombstone(cellName));
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/Operation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Operation.java b/src/java/org/apache/cassandra/cql3/Operation.java
index 00dd046..6bf46b5 100644
--- a/src/java/org/apache/cassandra/cql3/Operation.java
+++ b/src/java/org/apache/cassandra/cql3/Operation.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.cql3;
 
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.CounterColumnType;
@@ -54,6 +56,23 @@ public abstract class Operation
         this.t = t;
     }
 
+    // Whether the colum operated on is a static column (on trunk, Operation stores the ColumnDefinition directly,
+    // not just the column name, so we'll be able to remove that lookup and check ColumnDefinition.isStatic field
+    // directly. But for 2.0, it's simpler that way).
+    public boolean isStatic(CFMetaData cfm)
+    {
+        if (columnName == null)
+            return false;
+
+        ColumnDefinition def = cfm.getColumnDefinition(columnName.key);
+        return def != null && def.type == ColumnDefinition.Type.STATIC;
+    }
+
+    protected ColumnNameBuilder maybeUpdatePrefix(CFMetaData cfm, ColumnNameBuilder prefix)
+    {
+        return isStatic(cfm) ? cfm.getStaticColumnNameBuilder() : prefix;
+    }
+
     /**
      * @return whether the operation requires a read of the previous value to be executed
      * (only lists setterByIdx, discard and discardByIdx requires that).

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/Sets.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Sets.java b/src/java/org/apache/cassandra/cql3/Sets.java
index 0fcb8bf..69bc3d3 100644
--- a/src/java/org/apache/cassandra/cql3/Sets.java
+++ b/src/java/org/apache/cassandra/cql3/Sets.java
@@ -230,7 +230,7 @@ public abstract class Sets
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
             // delete + add
-            ColumnNameBuilder column = prefix.add(columnName.key);
+            ColumnNameBuilder column = maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key);
             cf.addAtom(params.makeTombstoneForOverwrite(column.build(), column.buildAsEndOfRange()));
             Adder.doAdd(t, cf, column, params);
         }
@@ -245,7 +245,7 @@ public abstract class Sets
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
         {
-            doAdd(t, cf, prefix.add(columnName.key), params);
+            doAdd(t, cf, maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key), params);
         }
 
         static void doAdd(Term t, ColumnFamily cf, ColumnNameBuilder columnName, UpdateParameters params) throws InvalidRequestException
@@ -283,7 +283,7 @@ public abstract class Sets
                                       ? Collections.singleton(((Constants.Value)value).bytes)
                                       : ((Sets.Value)value).elements;
 
-            ColumnNameBuilder column = prefix.add(columnName.key);
+            ColumnNameBuilder column = maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key);
             for (ByteBuffer bb : toDiscard)
             {
                 ByteBuffer cellName = column.copy().add(bb).build();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java b/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
index ac6b999..4f3ff4a 100644
--- a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
+++ b/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
@@ -52,9 +52,9 @@ public class TokenFct extends AbstractFunction
 
     private static AbstractType[] getKeyTypes(CFMetaData cfm)
     {
-        AbstractType[] types = new AbstractType[cfm.getCfDef().keys.size()];
+        AbstractType[] types = new AbstractType[cfm.getCfDef().partitionKeyCount()];
         int i = 0;
-        for (CFDefinition.Name name : cfm.getCfDef().keys.values())
+        for (CFDefinition.Name name : cfm.getCfDef().partitionKeys())
             types[i++] = name.type;
         return types;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index f740ea6..85b3547 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -47,8 +47,15 @@ public class AlterTableStatement extends SchemaAlteringStatement
     public final ColumnIdentifier columnName;
     private final CFPropDefs cfProps;
     private final Map<ColumnIdentifier, ColumnIdentifier> renames;
+    private final boolean isStatic; // Only for ALTER ADD
 
-    public AlterTableStatement(CFName name, Type type, ColumnIdentifier columnName, CQL3Type validator, CFPropDefs cfProps, Map<ColumnIdentifier, ColumnIdentifier> renames)
+    public AlterTableStatement(CFName name,
+                               Type type,
+                               ColumnIdentifier columnName,
+                               CQL3Type validator,
+                               CFPropDefs cfProps,
+                               Map<ColumnIdentifier, ColumnIdentifier> renames,
+                               boolean isStatic)
     {
         super(name);
         this.oType = type;
@@ -56,6 +63,7 @@ public class AlterTableStatement extends SchemaAlteringStatement
         this.validator = validator; // used only for ADD/ALTER commands
         this.cfProps = cfProps;
         this.renames = renames;
+        this.isStatic = isStatic;
     }
 
     public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
@@ -79,7 +87,11 @@ public class AlterTableStatement extends SchemaAlteringStatement
         {
             case ADD:
                 if (cfDef.isCompact)
-                    throw new InvalidRequestException("Cannot add new column to a compact CF");
+                    throw new InvalidRequestException("Cannot add new column to a COMPACT STORAGE table");
+
+                if (isStatic && !cfDef.isComposite)
+                    throw new InvalidRequestException("Static columns are not allowed in COMPACT STORAGE tables");
+
                 if (name != null)
                 {
                     switch (name.kind)
@@ -87,7 +99,7 @@ public class AlterTableStatement extends SchemaAlteringStatement
                         case KEY_ALIAS:
                         case COLUMN_ALIAS:
                             throw new InvalidRequestException(String.format("Invalid column name %s because it conflicts with a PRIMARY KEY part", columnName));
-                        case COLUMN_METADATA:
+                        default:
                             throw new InvalidRequestException(String.format("Invalid column name %s because it conflicts with an existing column", columnName));
                     }
                 }
@@ -117,7 +129,9 @@ public class AlterTableStatement extends SchemaAlteringStatement
                 Integer componentIndex = cfDef.isComposite
                                        ? ((CompositeType)meta.comparator).types.size() - (cfDef.hasCollections ? 2 : 1)
                                        : null;
-                cfm.addColumnDefinition(ColumnDefinition.regularDef(columnName.key, type, componentIndex));
+                cfm.addColumnDefinition(isStatic
+                                        ? ColumnDefinition.staticDef(columnName.key, type, componentIndex)
+                                        : ColumnDefinition.regularDef(columnName.key, type, componentIndex));
                 break;
 
             case ALTER:
@@ -178,6 +192,7 @@ public class AlterTableStatement extends SchemaAlteringStatement
                         cfm.defaultValidator(validator.getType());
                         break;
                     case COLUMN_METADATA:
+                    case STATIC:
                         ColumnDefinition column = cfm.getColumnDefinition(columnName.key);
                         // Thrift allows to change a column validator so CFMetaData.validateCompatibility will let it slide
                         // if we change to an incompatible type (contrarily to the comparator case). But we don't want to
@@ -196,10 +211,8 @@ public class AlterTableStatement extends SchemaAlteringStatement
                 break;
 
             case DROP:
-                if (cfDef.isCompact)
-                    throw new InvalidRequestException("Cannot drop columns from a compact CF");
-                if (!cfDef.isComposite)
-                    throw new InvalidRequestException("Cannot drop columns from a non-CQL3 CF");
+                if (cfDef.isCompact || !cfDef.isComposite)
+                    throw new InvalidRequestException("Cannot drop columns from a COMPACT STORAGE table");
                 if (name == null)
                     throw new InvalidRequestException(String.format("Column %s was not found in table %s", columnName, columnFamily()));
 
@@ -209,8 +222,9 @@ public class AlterTableStatement extends SchemaAlteringStatement
                     case COLUMN_ALIAS:
                         throw new InvalidRequestException(String.format("Cannot drop PRIMARY KEY part %s", columnName));
                     case COLUMN_METADATA:
+                    case STATIC:
                         ColumnDefinition toDelete = null;
-                        for (ColumnDefinition columnDef : cfm.regularColumns())
+                        for (ColumnDefinition columnDef : cfm.regularAndStaticColumns())
                         {
                             if (columnDef.name.equals(columnName.key))
                                 toDelete = columnDef;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 6151490..d4acbae 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -20,11 +20,11 @@ package org.apache.cassandra.cql3.statements;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import com.google.common.collect.Iterables;
 import org.github.jamm.MemoryMeter;
 
 import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
@@ -47,6 +47,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
     public final Type type;
     private final List<ModificationStatement> statements;
     private final Attributes attrs;
+    private final boolean hasConditions;
 
     /**
      * Creates a new BatchStatement from a list of statements and a
@@ -58,10 +59,16 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
      */
     public BatchStatement(int boundTerms, Type type, List<ModificationStatement> statements, Attributes attrs)
     {
+        this(boundTerms, type, statements, attrs, false);
+    }
+
+    public BatchStatement(int boundTerms, Type type, List<ModificationStatement> statements, Attributes attrs, boolean hasConditions)
+    {
         this.boundTerms = boundTerms;
         this.type = type;
         this.statements = statements;
         this.attrs = attrs;
+        this.hasConditions = hasConditions;
     }
 
     public long measureForPreparedCache(MemoryMeter meter)
@@ -103,25 +110,15 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         return statements;
     }
 
-    private Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
-    throws RequestExecutionException, RequestValidationException
-    {
-        Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String, ByteBuffer>, IMutation>();
-        for (ModificationStatement statement : statements)
-            addStatementMutations(statement, variables, local, cl, now, mutations);
-
-        return mutations.values();
-    }
-
-    private Collection<? extends IMutation> getMutations(List<List<ByteBuffer>> variables, ConsistencyLevel cl, long now)
+    private Collection<? extends IMutation> getMutations(BatchVariables variables, boolean local, ConsistencyLevel cl, long now)
     throws RequestExecutionException, RequestValidationException
     {
         Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String, ByteBuffer>, IMutation>();
         for (int i = 0; i < statements.size(); i++)
         {
             ModificationStatement statement = statements.get(i);
-            List<ByteBuffer> statementVariables = variables.get(i);
-            addStatementMutations(statement, statementVariables, false, cl, now, mutations);
+            List<ByteBuffer> statementVariables = variables.getVariablesForStatement(i);
+            addStatementMutations(statement, statementVariables, local, cl, now, mutations);
         }
         return mutations.values();
     }
@@ -156,31 +153,132 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         if (options.getConsistency() == null)
             throw new InvalidRequestException("Invalid empty consistency level");
 
-        execute(getMutations(options.getValues(), false, options.getConsistency(), queryState.getTimestamp()), options.getConsistency());
-        return null;
+        return execute(new PreparedBatchVariables(options.getValues()), false, options.getConsistency(), queryState.getTimestamp());
     }
 
-    public void executeWithPerStatementVariables(ConsistencyLevel cl, QueryState queryState, List<List<ByteBuffer>> variables) throws RequestExecutionException, RequestValidationException
+    public ResultMessage executeWithPerStatementVariables(ConsistencyLevel cl, QueryState queryState, List<List<ByteBuffer>> variables) throws RequestExecutionException, RequestValidationException
     {
         if (cl == null)
             throw new InvalidRequestException("Invalid empty consistency level");
 
-        execute(getMutations(variables, cl, queryState.getTimestamp()), cl);
+        return execute(new BatchOfPreparedVariables(variables), false, cl, queryState.getTimestamp());
+    }
+
+    public ResultMessage execute(BatchVariables variables, boolean local, ConsistencyLevel cl, long now)
+    throws RequestExecutionException, RequestValidationException
+    {
+        // TODO: we don't support a serial consistency for batches in the protocol so defaulting to SERIAL for now.
+        // We'll need to fix that.
+        if (hasConditions)
+            return executeWithConditions(variables, cl, ConsistencyLevel.SERIAL, now);
+
+        executeWithoutConditions(getMutations(variables, local, cl, now), cl);
+        return null;
     }
 
-    private void execute(Collection<? extends IMutation> mutations, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException
+    private void executeWithoutConditions(Collection<? extends IMutation> mutations, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException
     {
         boolean mutateAtomic = (type == Type.LOGGED && mutations.size() > 1);
         StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);
     }
 
+    private ResultMessage executeWithConditions(BatchVariables variables, ConsistencyLevel cl, ConsistencyLevel serialCf, long now)
+    throws RequestExecutionException, RequestValidationException
+    {
+        ByteBuffer key = null;
+        String ksName = null;
+        String cfName = null;
+        ColumnFamily updates = null;
+        CQL3CasConditions conditions = null;
+        Set<ColumnIdentifier> columnsWithConditions = new LinkedHashSet<ColumnIdentifier>();
+
+        for (int i = 0; i < statements.size(); i++)
+        {
+            ModificationStatement statement = statements.get(i);
+            List<ByteBuffer> statementVariables = variables.getVariablesForStatement(i);
+            long timestamp = attrs.getTimestamp(now, statementVariables);
+            List<ByteBuffer> pks = statement.buildPartitionKeyNames(statementVariables);
+            if (pks.size() > 1)
+                throw new IllegalArgumentException("Batch with conditions cannot span multiple partitions (you cannot use IN on the partition key)");
+            if (key == null)
+            {
+                key = pks.get(0);
+                ksName = statement.cfm.ksName;
+                cfName = statement.cfm.cfName;
+                conditions = new CQL3CasConditions(statement.cfm, now);
+                updates = UnsortedColumns.factory.create(statement.cfm);
+            }
+            else if (!key.equals(pks.get(0)))
+            {
+                throw new InvalidRequestException("Batch with conditions cannot span multiple partitions");
+            }
+
+            if (statement.hasConditions())
+            {
+                ColumnNameBuilder clusteringPrefix = statement.createClusteringPrefixBuilder(statementVariables);
+                statement.addUpdatesAndConditions(key, clusteringPrefix, updates, conditions, statementVariables, timestamp);
+                // As soon as we have a ifNotExists, we set columnsWithConditions to null so that everything is in the resultSet
+                if (statement.hasIfNotExistCondition())
+                    columnsWithConditions = null;
+                else if (columnsWithConditions != null)
+                    Iterables.addAll(columnsWithConditions, statement.getColumnsWithConditions());
+            }
+            else
+            {
+                // getPartitionKey will already have thrown if there is more than one key involved
+                IMutation mut = statement.getMutations(statementVariables, false, cl, timestamp, true).iterator().next();
+                updates.resolve(mut.getColumnFamilies().iterator().next());
+            }
+        }
+
+        ColumnFamily result = StorageProxy.cas(ksName, cfName, key, conditions, updates, serialCf, cl);
+        return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, key, cfName, result, columnsWithConditions, true));
+    }
+
     public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException
     {
-        for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp()))
+        assert !hasConditions;
+
+        for (IMutation mutation : getMutations(new PreparedBatchVariables(Collections.<ByteBuffer>emptyList()), true, null, queryState.getTimestamp()))
             mutation.apply();
         return null;
     }
 
+    public interface BatchVariables
+    {
+        public List<ByteBuffer> getVariablesForStatement(int statementInBatch);
+    }
+
+    public static class PreparedBatchVariables implements BatchVariables
+    {
+        private final List<ByteBuffer> variables;
+
+        public PreparedBatchVariables(List<ByteBuffer> variables)
+        {
+            this.variables = variables;
+        }
+
+        public List<ByteBuffer> getVariablesForStatement(int statementInBatch)
+        {
+            return variables;
+        }
+    }
+
+    public static class BatchOfPreparedVariables implements BatchVariables
+    {
+        private final List<List<ByteBuffer>> variables;
+
+        public BatchOfPreparedVariables(List<List<ByteBuffer>> variables)
+        {
+            this.variables = variables;
+        }
+
+        public List<ByteBuffer> getVariablesForStatement(int statementInBatch)
+        {
+            return variables.get(statementInBatch);
+        }
+    }
+
     public String toString()
     {
         return String.format("BatchStatement(type=%s, statements=%s)", type, statements);
@@ -212,11 +310,12 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
             VariableSpecifications boundNames = getBoundVariables();
 
             List<ModificationStatement> statements = new ArrayList<ModificationStatement>(parsedStatements.size());
+            boolean hasConditions = false;
             for (ModificationStatement.Parsed parsed : parsedStatements)
             {
                 ModificationStatement stmt = parsed.prepare(boundNames);
                 if (stmt.hasConditions())
-                    throw new InvalidRequestException("Conditional updates are not allowed in batches");
+                    hasConditions = true;
 
                 if (stmt.isCounter() && type != Type.COUNTER)
                     throw new InvalidRequestException("Counter mutations are only allowed in COUNTER batches");
@@ -227,10 +326,23 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
                 statements.add(stmt);
             }
 
+            if (hasConditions)
+            {
+                String ksName = null;
+                String cfName = null;
+                for (ModificationStatement stmt : statements)
+                {
+                    if (ksName != null && (!stmt.keyspace().equals(ksName) || !stmt.columnFamily().equals(cfName)))
+                        throw new InvalidRequestException("Batch with conditions cannot span multiple tables");
+                    ksName = stmt.keyspace();
+                    cfName = stmt.columnFamily();
+                }
+            }
+
             Attributes prepAttrs = attrs.prepare("[batch]", "[batch]");
             prepAttrs.collectMarkerSpecification(boundNames);
 
-            return new ParsedStatement.Prepared(new BatchStatement(boundNames.size(), type, statements, prepAttrs), boundNames);
+            return new ParsedStatement.Prepared(new BatchStatement(boundNames.size(), type, statements, prepAttrs, hasConditions), boundNames);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
new file mode 100644
index 0000000..194ff0c
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.service.CASConditions;
+
+/**
+ * Processed CAS conditions on potentially multiple rows of the same partition.
+ */
+public class CQL3CasConditions implements CASConditions
+{
+    private final CFMetaData cfm;
+    private final long now;
+
+    // We index RowCondition by the prefix of the row they applied to for 2 reasons:
+    //   1) this allows to keep things sorted to build the ColumnSlice array below
+    //   2) this allows to detect when contradictory conditions are set (not exists with some other conditions on the same row)
+    private final SortedMap<ByteBuffer, RowCondition> conditions;
+
+    public CQL3CasConditions(CFMetaData cfm, long now)
+    {
+        this.cfm = cfm;
+        this.now = now;
+        this.conditions = new TreeMap<>(cfm.comparator);
+    }
+
+    public void addNotExist(ColumnNameBuilder prefix) throws InvalidRequestException
+    {
+        RowCondition previous = conditions.put(prefix.build(), new NotExistCondition(prefix, now));
+        if (previous != null && !(previous instanceof NotExistCondition))
+            throw new InvalidRequestException("Cannot mix IF conditions and IF NOT EXISTS for the same row");
+    }
+
+    public void addConditions(ColumnNameBuilder prefix, Collection<ColumnCondition> conds, List<ByteBuffer> variables) throws InvalidRequestException
+    {
+        ByteBuffer b = prefix.build();
+        RowCondition condition = conditions.get(b);
+        if (condition == null)
+        {
+            condition = new ColumnsConditions(prefix, now);
+            conditions.put(b, condition);
+        }
+        else if (!(condition instanceof ColumnsConditions))
+        {
+            throw new InvalidRequestException("Cannot mix IF conditions and IF NOT EXISTS for the same row");
+        }
+        ((ColumnsConditions)condition).addConditions(conds, variables);
+    }
+
+    public IDiskAtomFilter readFilter()
+    {
+        assert !conditions.isEmpty();
+        ColumnSlice[] slices = new ColumnSlice[conditions.size()];
+        int i = 0;
+        // We always read CQL rows entirely as on CAS failure we want to be able to distinguish between "row exists
+        // but all values on why there were conditions are null" and "row doesn't exists", and we can't rely on the
+        // row marker for that (see #6623)
+        for (Map.Entry<ByteBuffer, RowCondition> entry : conditions.entrySet())
+            slices[i++] = new ColumnSlice(entry.getKey(), entry.getValue().rowPrefix.buildAsEndOfRange());
+
+        return new SliceQueryFilter(slices, false, slices.length, cfm.clusteringKeyColumns().size());
+    }
+
+    public boolean appliesTo(ColumnFamily current) throws InvalidRequestException
+    {
+        for (RowCondition condition : conditions.values())
+        {
+            if (!condition.appliesTo(current))
+                return false;
+        }
+        return true;
+    }
+
+    private static abstract class RowCondition
+    {
+        public final ColumnNameBuilder rowPrefix;
+        protected final long now;
+
+        protected RowCondition(ColumnNameBuilder rowPrefix, long now)
+        {
+            this.rowPrefix = rowPrefix;
+            this.now = now;
+        }
+
+        public abstract boolean appliesTo(ColumnFamily current) throws InvalidRequestException;
+    }
+
+    private static class NotExistCondition extends RowCondition
+    {
+        private NotExistCondition(ColumnNameBuilder rowPrefix, long now)
+        {
+            super(rowPrefix, now);
+        }
+
+        public boolean appliesTo(ColumnFamily current)
+        {
+            if (current == null)
+                return true;
+
+            Iterator<Column> iter = current.iterator(new ColumnSlice[]{ new ColumnSlice(rowPrefix.build(), rowPrefix.buildAsEndOfRange()) });
+            while (iter.hasNext())
+                if (iter.next().isLive(now))
+                    return false;
+            return true;
+        }
+    }
+
+    private static class ColumnsConditions extends RowCondition
+    {
+        private final Map<ColumnIdentifier, ColumnCondition> conditions = new HashMap<>();
+
+        private ColumnsConditions(ColumnNameBuilder rowPrefix, long now)
+        {
+            super(rowPrefix, now);
+        }
+
+        public void addConditions(Collection<ColumnCondition> conds, List<ByteBuffer> variables) throws InvalidRequestException
+        {
+            for (ColumnCondition condition : conds)
+            {
+                // We will need the variables in appliesTo but with protocol batches, each condition in this object can have a
+                // different list of variables. So attach them to the condition directly, it's not particulary elegant but its simpler
+                ColumnCondition previous = conditions.put(condition.column.name, condition.attach(variables));
+                // If 2 conditions are actually equal, let it slide
+                if (previous != null && !previous.equalsTo(condition))
+                    throw new InvalidRequestException("Duplicate and incompatible conditions for column " + condition.column.name);
+            }
+        }
+
+        public boolean appliesTo(ColumnFamily current) throws InvalidRequestException
+        {
+            if (current == null)
+                return conditions.isEmpty();
+
+            for (ColumnCondition condition : conditions.values())
+                if (!condition.appliesTo(rowPrefix, current, now))
+                    return false;
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java b/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
index 8974523..5c3fcb9 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
@@ -32,10 +32,12 @@ public class ColumnGroupMap
 {
     private final ByteBuffer[] fullPath;
     private final Map<ByteBuffer, Value> map = new HashMap<ByteBuffer, Value>();
+    public final boolean isStatic; // Whether or not the group correspond to "static" cells
 
-    private ColumnGroupMap(ByteBuffer[] fullPath)
+    private ColumnGroupMap(ByteBuffer[] fullPath, boolean isStatic)
     {
         this.fullPath = fullPath;
+        this.isStatic = isStatic;
     }
 
     private void add(ByteBuffer[] fullName, int idx, Column column)
@@ -126,7 +128,7 @@ public class ColumnGroupMap
 
             if (currentGroup == null)
             {
-                currentGroup = new ColumnGroupMap(current);
+                currentGroup = new ColumnGroupMap(current, composite.isStaticName(c.name()));
                 currentGroup.add(current, idx, c);
                 previous = current;
                 return;
@@ -135,7 +137,8 @@ public class ColumnGroupMap
             if (!isSameGroup(current))
             {
                 groups.add(currentGroup);
-                currentGroup = new ColumnGroupMap(current);
+                // Note that we know that only the first group built can be static
+                currentGroup = new ColumnGroupMap(current, false);
             }
             currentGroup.add(current, idx, c);
             previous = current;
@@ -167,5 +170,25 @@ public class ColumnGroupMap
             }
             return groups;
         }
+
+        public boolean isEmpty()
+        {
+            return currentGroup == null && groups.isEmpty();
+        }
+
+        public ColumnGroupMap firstGroup()
+        {
+            if (currentGroup != null)
+            {
+                groups.add(currentGroup);
+                currentGroup = null;
+            }
+            return groups.get(0);
+        }
+
+        public void discardFirst()
+        {
+            groups.remove(0);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index 3ef6f5a..376fa4a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -87,6 +87,15 @@ public class CreateIndexStatement extends SchemaAlteringStatement
         if (cfm.getCfDef().isCompact && cd.type != ColumnDefinition.Type.REGULAR)
             throw new InvalidRequestException(String.format("Secondary index on %s column %s is not yet supported for compact table", cd.type, columnName));
 
+        // It would be possible to support 2ndary index on static columns (but not without modifications of at least ExtendedFilter and
+        // CompositesIndex) and maybe we should, but that means a query like:
+        //     SELECT * FROM foo WHERE static_column = 'bar'
+        // would pull the full partition every time the static column of partition is 'bar', which sounds like offering a
+        // fair potential for foot-shooting, so I prefer leaving that to a follow up ticket once we have identified cases where
+        // such indexing is actually useful.
+        if (cd.type == ColumnDefinition.Type.STATIC)
+            throw new InvalidRequestException("Secondary indexes are not allowed on static columns");
+
         if (cd.getValidator().isCollection() && !properties.isCustom)
             throw new InvalidRequestException("Indexes on collections are no yet supported");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
index 74f7570..632194c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
@@ -52,14 +52,16 @@ public class CreateTableStatement extends SchemaAlteringStatement
     private ByteBuffer valueAlias;
 
     private final Map<ColumnIdentifier, AbstractType> columns = new HashMap<ColumnIdentifier, AbstractType>();
+    private final Set<ColumnIdentifier> staticColumns;
     private final CFPropDefs properties;
     private final boolean ifNotExists;
 
-    public CreateTableStatement(CFName name, CFPropDefs properties, boolean ifNotExists)
+    public CreateTableStatement(CFName name, CFPropDefs properties, boolean ifNotExists, Set<ColumnIdentifier> staticColumns)
     {
         super(name);
         this.properties = properties;
         this.ifNotExists = ifNotExists;
+        this.staticColumns = staticColumns;
 
         try
         {
@@ -101,7 +103,10 @@ public class CreateTableStatement extends SchemaAlteringStatement
 
         for (Map.Entry<ColumnIdentifier, AbstractType> col : columns.entrySet())
         {
-            columnDefs.put(col.getKey().key, ColumnDefinition.regularDef(col.getKey().key, col.getValue(), componentIndex));
+            ColumnIdentifier id = col.getKey();
+            columnDefs.put(id.key, staticColumns.contains(id)
+                                   ? ColumnDefinition.staticDef(id.key, col.getValue(), componentIndex)
+                                   : ColumnDefinition.regularDef(id.key, col.getValue(), componentIndex));
         }
 
         return columnDefs;
@@ -166,6 +171,7 @@ public class CreateTableStatement extends SchemaAlteringStatement
         private final List<List<ColumnIdentifier>> keyAliases = new ArrayList<List<ColumnIdentifier>>();
         private final List<ColumnIdentifier> columnAliases = new ArrayList<ColumnIdentifier>();
         private final Map<ColumnIdentifier, Boolean> definedOrdering = new LinkedHashMap<ColumnIdentifier, Boolean>(); // Insertion ordering is important
+        private final Set<ColumnIdentifier> staticColumns = new HashSet<ColumnIdentifier>();
 
         private boolean useCompactStorage;
         private final Multiset<ColumnIdentifier> definedNames = HashMultiset.create(1);
@@ -195,7 +201,7 @@ public class CreateTableStatement extends SchemaAlteringStatement
 
             properties.validate();
 
-            CreateTableStatement stmt = new CreateTableStatement(cfName, properties, ifNotExists);
+            CreateTableStatement stmt = new CreateTableStatement(cfName, properties, ifNotExists, staticColumns);
 
             Map<ByteBuffer, CollectionType> definedCollections = null;
             for (Map.Entry<ColumnIdentifier, CQL3Type> entry : definitions.entrySet())
@@ -225,6 +231,8 @@ public class CreateTableStatement extends SchemaAlteringStatement
                 AbstractType<?> t = getTypeAndRemove(stmt.columns, alias);
                 if (t instanceof CounterColumnType)
                     throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", alias));
+                if (staticColumns.contains(alias))
+                    throw new InvalidRequestException(String.format("Static column %s cannot be part of the PRIMARY KEY", alias));
                 keyTypes.add(t);
             }
             stmt.keyValidator = keyTypes.size() == 1 ? keyTypes.get(0) : CompositeType.getInstance(keyTypes);
@@ -260,10 +268,13 @@ public class CreateTableStatement extends SchemaAlteringStatement
                 {
                     if (definedCollections != null)
                         throw new InvalidRequestException("Collection types are not supported with COMPACT STORAGE");
-                    stmt.columnAliases.add(columnAliases.get(0).key);
-                    stmt.comparator = getTypeAndRemove(stmt.columns, columnAliases.get(0));
+                    ColumnIdentifier alias = columnAliases.get(0);
+                    stmt.columnAliases.add(alias.key);
+                    stmt.comparator = getTypeAndRemove(stmt.columns, alias);
                     if (stmt.comparator instanceof CounterColumnType)
-                        throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", stmt.columnAliases.get(0)));
+                        throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", alias));
+                    if (staticColumns.contains(alias))
+                        throw new InvalidRequestException(String.format("Static column %s cannot be part of the PRIMARY KEY", alias));
                 }
                 else
                 {
@@ -275,6 +286,8 @@ public class CreateTableStatement extends SchemaAlteringStatement
                         AbstractType<?> type = getTypeAndRemove(stmt.columns, t);
                         if (type instanceof CounterColumnType)
                             throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", t.key));
+                        if (staticColumns.contains(t))
+                            throw new InvalidRequestException(String.format("Static column %s cannot be part of the PRIMARY KEY", t));
                         types.add(type);
                     }
 
@@ -298,6 +311,16 @@ public class CreateTableStatement extends SchemaAlteringStatement
                 }
             }
 
+            if (!staticColumns.isEmpty())
+            {
+                // Only CQL3 tables can have static columns
+                if (useCompactStorage)
+                    throw new InvalidRequestException("Static columns are not supported in COMPACT STORAGE tables");
+                // Static columns only make sense if we have at least one clustering column. Otherwise everything is static anyway
+                if (columnAliases.isEmpty())
+                    throw new InvalidRequestException("Static columns are only useful (and thus allowed) if the table has at least one clustering column");
+            }
+
             if (useCompactStorage && !stmt.columnAliases.isEmpty())
             {
                 if (stmt.columns.isEmpty())
@@ -373,10 +396,12 @@ public class CreateTableStatement extends SchemaAlteringStatement
             return isReversed != null && isReversed ? ReversedType.getInstance(type) : type;
         }
 
-        public void addDefinition(ColumnIdentifier def, CQL3Type type)
+        public void addDefinition(ColumnIdentifier def, CQL3Type type, boolean isStatic)
         {
             definedNames.add(def);
             definitions.put(def, type);
+            if (isStatic)
+                staticColumns.add(def);
         }
 
         public void addKeyAliases(List<ColumnIdentifier> aliases)