You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/07/16 15:28:08 UTC

[1/3] git commit: Add lists, sets and maps support

Updated Branches:
  refs/heads/trunk 3c92afdd8 -> 91bdf7fb4


Add lists, sets and maps support

patch by slebresne; reviewed by jbellis for CASSANDRA-3647


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/91bdf7fb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/91bdf7fb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/91bdf7fb

Branch: refs/heads/trunk
Commit: 91bdf7fb4220b27e9566c6673bf5dbd14153017c
Parents: 3c92afd
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Jun 5 11:01:51 2012 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon Jul 16 15:27:15 2012 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    3 +-
 .../org/apache/cassandra/cql3/CFDefinition.java    |   28 ++-
 src/java/org/apache/cassandra/cql3/CFPropDefs.java |   40 ---
 .../apache/cassandra/cql3/ColumnIdentifier.java    |   12 +-
 src/java/org/apache/cassandra/cql3/Cql.g           |  167 ++++++++---
 src/java/org/apache/cassandra/cql3/Operation.java  |   91 +++++--
 src/java/org/apache/cassandra/cql3/ParsedType.java |  129 ++++++++
 .../org/apache/cassandra/cql3/QueryProcessor.java  |    6 +-
 src/java/org/apache/cassandra/cql3/Term.java       |   73 +++--
 .../apache/cassandra/cql3/UpdateParameters.java    |   64 ++++
 src/java/org/apache/cassandra/cql3/Value.java      |  121 ++++++++
 .../cql3/statements/AlterTableStatement.java       |   35 ++-
 .../cassandra/cql3/statements/BatchStatement.java  |    4 +-
 .../cassandra/cql3/statements/ColumnGroupMap.java  |  175 +++++++++++
 .../statements/CreateColumnFamilyStatement.java    |   55 +++-
 .../cassandra/cql3/statements/DeleteStatement.java |  147 ++++++---
 .../cql3/statements/ModificationStatement.java     |   61 ++++-
 .../cassandra/cql3/statements/SelectStatement.java |  164 ++++-------
 .../apache/cassandra/cql3/statements/Selector.java |   78 +++++-
 .../cassandra/cql3/statements/UpdateStatement.java |  225 +++++++++-----
 .../org/apache/cassandra/db/ColumnFamilyStore.java |    2 +-
 .../apache/cassandra/db/SliceFromReadCommand.java  |    3 +-
 .../apache/cassandra/db/filter/ColumnCounter.java  |  108 +++++++
 .../apache/cassandra/db/filter/ExtendedFilter.java |    8 +
 .../cassandra/db/filter/SliceQueryFilter.java      |   62 +++-
 .../db/marshal/AbstractCompositeType.java          |   11 +-
 .../apache/cassandra/db/marshal/AbstractType.java  |   23 ++
 .../cassandra/db/marshal/CollectionType.java       |  135 +++++++++
 .../db/marshal/ColumnToCollectionType.java         |  135 +++++++++
 .../apache/cassandra/db/marshal/CompositeType.java |   44 ++-
 .../org/apache/cassandra/db/marshal/EmptyType.java |   67 ++++
 .../org/apache/cassandra/db/marshal/ListType.java  |  235 +++++++++++++++
 .../org/apache/cassandra/db/marshal/MapType.java   |  130 ++++++++
 .../org/apache/cassandra/db/marshal/SetType.java   |  134 ++++++++
 .../apache/cassandra/db/marshal/TypeParser.java    |   91 ++++++-
 src/java/org/apache/cassandra/utils/UUIDGen.java   |   26 ++-
 36 files changed, 2451 insertions(+), 441 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fb66fd6..5578aae 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,7 +27,8 @@
  * stream compressed sstables directly with java nio (CASSANDRA-4297)
  * Support multiple ranges in SliceQueryFilter (CASSANDRA-3885)
  * Add column metadata to system column families (CASSANDRA-4018)
- * (cql3) always use composite types by default (CASSANDRA-4329)
+ * (cql3) Always use composite types by default (CASSANDRA-4329)
+ * (cql3) Add support for set, map and list (CASSANDRA-3647)
 
 
 1.1.3

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/CFDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CFDefinition.java b/src/java/org/apache/cassandra/cql3/CFDefinition.java
index 34af56e..23edd62 100644
--- a/src/java/org/apache/cassandra/cql3/CFDefinition.java
+++ b/src/java/org/apache/cassandra/cql3/CFDefinition.java
@@ -25,6 +25,7 @@ import com.google.common.collect.AbstractIterator;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ColumnToCollectionType;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.thrift.InvalidRequestException;
@@ -55,6 +56,7 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
     // option when creating a table in that "static CF" without a composite type will have isCompact == false
     // even though one must use 'WITH COMPACT STORAGE' to declare them.
     public final boolean isCompact;
+    public final boolean hasCollections;
 
     public CFDefinition(CFMetaData cfm)
     {
@@ -68,6 +70,7 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
             {
                 // "dense" composite
                 this.isCompact = true;
+                this.hasCollections = false;
                 for (int i = 0; i < composite.types.size(); i++)
                 {
                     ColumnIdentifier id = getColumnId(cfm, i);
@@ -81,7 +84,20 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
                 this.isCompact = false;
                 this.value = null;
                 assert cfm.getValueAlias() == null;
-                for (int i = 0; i < composite.types.size() - 1; i++)
+                // check for collection type
+                int last = composite.types.size() - 1;
+                AbstractType<?> lastType = composite.types.get(last);
+                if (lastType instanceof ColumnToCollectionType)
+                {
+                    --last;
+                    this.hasCollections = true;
+                }
+                else
+                {
+                    this.hasCollections = false;
+                }
+
+                for (int i = 0; i < last; i++)
                 {
                     ColumnIdentifier id = getColumnId(cfm, i);
                     this.columns.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.COLUMN_ALIAS, i, composite.types.get(i)));
@@ -97,6 +113,7 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
         else
         {
             this.isComposite = false;
+            this.hasCollections = false;
             if (cfm.getColumn_metadata().isEmpty())
             {
                 // dynamic CF
@@ -145,6 +162,15 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
              : new ColumnIdentifier(cfm.getValueAlias(), definitionType);
     }
 
+    public ColumnToCollectionType getCollectionType()
+    {
+        if (!hasCollections)
+            return null;
+
+        CompositeType composite = (CompositeType)cfm.comparator;
+        return (ColumnToCollectionType)composite.types.get(composite.types.size() - 1);
+    }
+
     public Name get(ColumnIdentifier name)
     {
         if (name.equals(key.name))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/CFPropDefs.java
index 37bcc06..6b7a56e 100644
--- a/src/java/org/apache/cassandra/cql3/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql3/CFPropDefs.java
@@ -49,7 +49,6 @@ public class CFPropDefs
     public static final String KW_BF_FP_CHANCE = "bloom_filter_fp_chance";
 
     // Maps CQL short names to the respective Cassandra comparator/validator class names
-    public static final Map<String, String> comparators = new HashMap<String, String>();
     public static final Set<String> keywords = new HashSet<String>();
     public static final Set<String> obsoleteKeywords = new HashSet<String>();
     public static final Set<String> allowedKeywords = new HashSet<String>();
@@ -59,23 +58,6 @@ public class CFPropDefs
 
     static
     {
-        comparators.put("ascii", "AsciiType");
-        comparators.put("bigint", "LongType");
-        comparators.put("blob", "BytesType");
-        comparators.put("boolean", "BooleanType");
-        comparators.put("counter", "CounterColumnType");
-        comparators.put("decimal", "DecimalType");
-        comparators.put("double", "DoubleType");
-        comparators.put("float", "FloatType");
-        comparators.put("inet", "InetAddressType");
-        comparators.put("int", "Int32Type");
-        comparators.put("text", "UTF8Type");
-        comparators.put("timestamp", "DateType");
-        comparators.put("uuid", "UUIDType");
-        comparators.put("varchar", "UTF8Type");
-        comparators.put("varint", "IntegerType");
-        comparators.put("timeuuid", "TimeUUIDType");
-
         keywords.add(KW_COMMENT);
         keywords.add(KW_READREPAIRCHANCE);
         keywords.add(KW_DCLOCALREADREPAIRCHANCE);
@@ -97,28 +79,6 @@ public class CFPropDefs
             put(CompressionParameters.SSTABLE_COMPRESSION, CFMetaData.DEFAULT_COMPRESSOR);
     }};
 
-    public static AbstractType<?> parseType(String type) throws InvalidRequestException
-    {
-        try
-        {
-            String className = comparators.get(type);
-            if (className == null)
-                className = type;
-            return TypeParser.parse(className);
-        }
-        catch (ConfigurationException e)
-        {
-            InvalidRequestException ex = new InvalidRequestException(e.toString());
-            ex.initCause(e);
-            throw ex;
-        }
-    }
-
-    /* If not comparator/validator is not specified, default to text (BytesType is the wrong default for CQL
-     * since it uses hex terms).  If the value specified is not found in the comparators map, assume the user
-     * knows what they are doing (a custom comparator/validator for example), and pass it on as-is.
-     */
-
     public void validate() throws ConfigurationException
     {
         // Catch the case where someone passed a kwarg that is not recognized.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java b/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
index fdbf482..17d97d3 100644
--- a/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
+++ b/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
@@ -29,7 +29,7 @@ import org.apache.cassandra.cql3.statements.Selector;
 /**
  * Represents an identifer for a CQL column definition.
  */
-public class ColumnIdentifier implements Comparable<ColumnIdentifier>, Selector
+public class ColumnIdentifier extends Selector implements Comparable<ColumnIdentifier>
 {
     public final ByteBuffer key;
     private final String text;
@@ -76,14 +76,4 @@ public class ColumnIdentifier implements Comparable<ColumnIdentifier>, Selector
     {
         return this;
     }
-
-    public boolean hasFunction()
-    {
-        return false;
-    }
-
-    public Selector.Function function()
-    {
-        return null;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index bbebedc..04c2391 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -27,6 +27,7 @@ options {
     package org.apache.cassandra.cql3;
 
     import java.util.ArrayList;
+    import java.util.Arrays;
     import java.util.Collections;
     import java.util.HashMap;
     import java.util.LinkedHashMap;
@@ -34,6 +35,8 @@ options {
     import java.util.Map;
 
     import org.apache.cassandra.cql3.statements.*;
+    import org.apache.cassandra.config.ConfigurationException;
+    import org.apache.cassandra.db.marshal.CollectionType;
     import org.apache.cassandra.utils.Pair;
     import org.apache.cassandra.thrift.ConsistencyLevel;
     import org.apache.cassandra.thrift.InvalidRequestException;
@@ -190,8 +193,7 @@ selector returns [Selector s]
     ;
 
 selectCountClause returns [List<Selector> expr]
-    : ids=cidentList { $expr = new ArrayList<Selector>(ids); }
-    | '\*'           { $expr = Collections.<Selector>emptyList();}
+    : '\*'           { $expr = Collections.<Selector>emptyList();}
     | i=INTEGER      { if (!i.getText().equals("1")) addRecognitionError("Only COUNT(1) is supported, got COUNT(" + i.getText() + ")"); $expr = Collections.<Selector>emptyList();}
     ;
 
@@ -219,15 +221,15 @@ insertStatement returns [UpdateStatement expr]
     @init {
         Attributes attrs = new Attributes();
         List<ColumnIdentifier> columnNames  = new ArrayList<ColumnIdentifier>();
-        List<Term> columnValues = new ArrayList<Term>();
+        List<Value> columnValues = new ArrayList<Value>();
     }
     : K_INSERT K_INTO cf=columnFamilyName
           '(' c1=cident { columnNames.add(c1); }  ( ',' cn=cident { columnNames.add(cn); } )+ ')'
         K_VALUES
-          '(' v1=term { columnValues.add(v1); } ( ',' vn=term { columnValues.add(vn); } )+ ')'
+          '(' v1=value { columnValues.add(v1); } ( ',' vn=value { columnValues.add(vn); } )+ ')'
         ( usingClause[attrs] )?
       {
-          $expr = new UpdateStatement(cf, columnNames, columnValues, attrs);
+          $expr = new UpdateStatement(cf, attrs, columnNames, columnValues);
       }
     ;
 
@@ -258,7 +260,7 @@ usingClauseObjective[Attributes attrs]
 updateStatement returns [UpdateStatement expr]
     @init {
         Attributes attrs = new Attributes();
-        Map<ColumnIdentifier, Operation> columns = new HashMap<ColumnIdentifier, Operation>();
+        List<Pair<ColumnIdentifier, Operation>> columns = new ArrayList<Pair<ColumnIdentifier, Operation>>();
     }
     : K_UPDATE cf=columnFamilyName
       ( usingClause[attrs] )?
@@ -278,9 +280,9 @@ updateStatement returns [UpdateStatement expr]
 deleteStatement returns [DeleteStatement expr]
     @init {
         Attributes attrs = new Attributes();
-        List<ColumnIdentifier> columnsList = Collections.emptyList();
+        List<Selector> columnsList = Collections.emptyList();
     }
-    : K_DELETE ( ids=cidentList { columnsList = ids; } )?
+    : K_DELETE ( ids=deleteSelection { columnsList = ids; } )?
       K_FROM cf=columnFamilyName
       ( usingClauseDelete[attrs] )?
       K_WHERE wclause=whereClause
@@ -289,6 +291,14 @@ deleteStatement returns [DeleteStatement expr]
       }
     ;
 
+deleteSelection returns [List<Selector> expr]
+    : t1=deleteSelector { $expr = new ArrayList<Selector>(); $expr.add(t1); } (',' tN=deleteSelector { $expr.add(tN); })*
+    ;
+
+deleteSelector returns [Selector s]
+    : c=cident                { $s = c; }
+    | c=cident '[' t=term ']' { $s = new Selector.WithKey(c, t); }
+    ;
 
 /**
  * BEGIN BATCH [USING CONSISTENCY <LVL>]
@@ -461,12 +471,32 @@ cfOrKsName[CFName name, boolean isKs]
     | k=unreserved_keyword { if (isKs) $name.setKeyspace(k, false); else $name.setColumnFamily(k, false); }
     ;
 
-cidentList returns [List<ColumnIdentifier> items]
-    @init{ $items = new ArrayList<ColumnIdentifier>(); }
-    :  t1=cident { $items.add(t1); } (',' tN=cident { $items.add(tN); })*
+// Values (includes prepared statement markers)
+value returns [Value value]
+    : t=term               { $value = t; }
+    | c=collection_literal { $value = c; }
+    ;
+
+collection_literal returns [Value value]
+    : ll=list_literal { $value = ll; }
+    | sl=set_literal  { $value = sl; }
+    | ml=map_literal  { $value = ml; }
+    ;
+
+list_literal returns [Value.ListLiteral value]
+    : '[' { Value.ListLiteral l = new Value.ListLiteral(); } ( t1=term { l.add(t1); } ( ',' tn=term { l.add(tn); } )* )? ']' { $value = l; }
+    ;
+
+set_literal returns [Value.SetLiteral value]
+    : '{' { Value.SetLiteral s = new Value.SetLiteral(); } ( t1=term { s.add(t1); } ( ',' tn=term { s.add(tn); } )* )? '}'  { $value = s; }
+    ;
+
+map_literal returns [Value.MapLiteral value]
+    // Note that we have an ambiguity between maps and set for "{}". So we force it to a set, and deal with it later based on the type of the column
+    : '{' { Value.MapLiteral m = new Value.MapLiteral(); } k1=term ':' v1=term { m.put(k1, v1); } ( ',' kn=term ':' vn=term { m.put(kn, vn); } )* '}'
+       { $value = m; }
     ;
 
-// Values (includes prepared statement markers)
 extendedTerm returns [Term term]
     : K_TOKEN '(' t=term ')' { $term = Term.tokenOf(t); }
     | t=term                 { $term = t; }
@@ -482,19 +512,45 @@ intTerm returns [Term integer]
     | t=QMARK   { $integer = new Term($t.text, $t.type, ++currentBindMarkerIdx); }
     ;
 
-termPairWithOperation[Map<ColumnIdentifier, Operation> columns]
+termPairWithOperation[List<Pair<ColumnIdentifier, Operation>> columns]
     : key=cident '='
-        ( value=term { columns.put(key, new Operation(value)); }
-        | c=cident ( '+'     v=intTerm { columns.put(key, new Operation(c, Operation.Type.PLUS, v)); }
-                   | op='-'? v=intTerm
-                     {
-                       validateMinusSupplied(op, v, input);
-                       if (op == null)
-                           v = new Term(-(Long.valueOf(v.getText())), v.getType());
-                       columns.put(key, new Operation(c, Operation.Type.MINUS, v));
-                     }
-                    )
+        ( v=value { columns.add(Pair.<ColumnIdentifier, Operation>create(key, new Operation.Set(v))); }
+        | c=cident op=operation
+          {
+              if (!key.equals(c))
+                  addRecognitionError("Only expressions like X = X <op> <value> are supported.");
+              columns.add(Pair.<ColumnIdentifier, Operation>create(key, op));
+          }
+        | ll=list_literal '+' c=cident
+          {
+              if (!key.equals(c))
+                  addRecognitionError("Only expressions like X = <value> + X are supported.");
+              columns.add(Pair.<ColumnIdentifier, Operation>create(key, new Operation.Function(CollectionType.Function.PREPEND, ll)));
+          }
         )
+    | key=cident '[' t=term ']' '=' vv=term
+      {
+          List<Term> args = Arrays.asList(t, vv);
+          columns.add(Pair.<ColumnIdentifier, Operation>create(key, new Operation.Function(CollectionType.Function.SET, args)));
+      }
+    ;
+
+operation returns [Operation op]
+    : '+' v=intTerm { $op = new Operation.Counter(v, false); }
+    | sign='-'? v=intTerm
+      {
+          validateMinusSupplied(sign, v, input);
+          if (sign == null)
+              v = new Term(-(Long.valueOf(v.getText())), v.getType());
+          $op = new Operation.Counter(v, true);
+      }
+    | '+' ll=list_literal { $op = new Operation.Function(CollectionType.Function.APPEND, ll); }
+    | '-' ll=list_literal { $op = new Operation.Function(CollectionType.Function.DISCARD_LIST, ll); }
+
+    | '+' sl=set_literal { $op = new Operation.Function(CollectionType.Function.ADD, sl.asList()); }
+    | '-' sl=set_literal { $op = new Operation.Function(CollectionType.Function.DISCARD_SET, sl.asList()); }
+
+    | '+' ml=map_literal { $op = new Operation.Function(CollectionType.Function.SET, ml.asList()); }
     ;
 
 property returns [String str]
@@ -519,29 +575,45 @@ relation returns [Relation rel]
       '(' f1=term { $rel.addInValue(f1); } (',' fN=term { $rel.addInValue(fN); } )* ')'
     ;
 
-comparatorType returns [String str]
-    : c=native_type    { $str=c; }
-    | s=STRING_LITERAL { $str = $s.text; }
+comparatorType returns [ParsedType t]
+    : c=native_type     { $t = c; }
+    | c=collection_type { $t = c; }
+    | s=STRING_LITERAL
+      {
+        try {
+            $t = new ParsedType.Custom($s.text);
+        } catch (ConfigurationException e) {
+            addRecognitionError("Cannot parse type " + $s.text + ": " + e.getMessage());
+        }
+      }
     ;
 
-native_type returns [String str]
-    : c=( K_ASCII
-        | K_BIGINT
-        | K_BLOB
-        | K_BOOLEAN
-        | K_COUNTER
-        | K_DECIMAL
-        | K_DOUBLE
-        | K_FLOAT
-        | K_INET
-        | K_INT
-        | K_TEXT
-        | K_TIMESTAMP
-        | K_UUID
-        | K_VARCHAR
-        | K_VARINT
-        | K_TIMEUUID
-      ) { return $c.text; }
+native_type returns [ParsedType t]
+    : K_ASCII     { $t = ParsedType.Native.ASCII; }
+    | K_BIGINT    { $t = ParsedType.Native.BIGINT; }
+    | K_BLOB      { $t = ParsedType.Native.BLOB; }
+    | K_BOOLEAN   { $t = ParsedType.Native.BOOLEAN; }
+    | K_COUNTER   { $t = ParsedType.Native.COUNTER; }
+    | K_DECIMAL   { $t = ParsedType.Native.DECIMAL; }
+    | K_DOUBLE    { $t = ParsedType.Native.DOUBLE; }
+    | K_FLOAT     { $t = ParsedType.Native.FLOAT; }
+    | K_INET      { $t = ParsedType.Native.INET;}
+    | K_INT       { $t = ParsedType.Native.INT; }
+    | K_TEXT      { $t = ParsedType.Native.TEXT; }
+    | K_TIMESTAMP { $t = ParsedType.Native.TIMESTAMP; }
+    | K_UUID      { $t = ParsedType.Native.UUID; }
+    | K_VARCHAR   { $t = ParsedType.Native.VARCHAR; }
+    | K_VARINT    { $t = ParsedType.Native.VARINT; }
+    | K_TIMEUUID  { $t = ParsedType.Native.TIMEUUID; }
+    ;
+
+collection_type returns [ParsedType pt]
+    : K_MAP  '<' t1=comparatorType ',' t2=comparatorType '>'
+        { try { $pt = ParsedType.Collection.map(t1, t2); } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); } }
+    | K_LIST '<' t=comparatorType '>'
+        { try { $pt = ParsedType.Collection.list(t); } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); } }
+    | K_SET  '<' t=comparatorType '>'
+        { try { $pt = ParsedType.Collection.set(t); } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); } }
     ;
 
 unreserved_keyword returns [String str]
@@ -556,8 +628,10 @@ unreserved_keyword returns [String str]
         | K_TYPE
         | K_VALUES
         | K_WRITETIME
+        | K_MAP
+        | K_LIST
         ) { $str = $k.text; }
-    | t=native_type { $str = t; }
+    | t=native_type { $str = t.toString(); }
     ;
 
 
@@ -634,6 +708,9 @@ K_TIMEUUID:    T I M E U U I D;
 K_TOKEN:       T O K E N;
 K_WRITETIME:   W R I T E T I M E;
 
+K_MAP:         M A P;
+K_LIST:        L I S T;
+
 // Case-insensitive alpha characters
 fragment A: ('a'|'A');
 fragment B: ('b'|'B');

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/Operation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Operation.java b/src/java/org/apache/cassandra/cql3/Operation.java
index 4b7989e..e7291c3 100644
--- a/src/java/org/apache/cassandra/cql3/Operation.java
+++ b/src/java/org/apache/cassandra/cql3/Operation.java
@@ -17,37 +17,92 @@
  */
 package org.apache.cassandra.cql3;
 
-public class Operation
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.thrift.InvalidRequestException;
+
+public abstract class Operation
 {
-    public static enum Type { PLUS, MINUS }
+    public static enum Type { SET, COUNTER, FUNCTION }
 
     public final Type type;
-    public final ColumnIdentifier ident;
-    public final Term value;
 
-    // unary operation
-    public Operation(Term a)
+    protected Operation(Type type)
     {
-        this(null, null, a);
+        this.type = type;
     }
 
-    // binary operation
-    public Operation(ColumnIdentifier a, Type type, Term b)
+    public abstract Iterable<Term> allTerms();
+
+    public static class Set extends Operation
     {
-        this.ident = a;
-        this.type = type;
-        this.value = b;
+        public final Value value;
+
+        public Set(Value value)
+        {
+            super(Type.SET);
+            this.value = value;
+        }
+
+        @Override
+        public String toString()
+        {
+            return " = " + value;
+        }
+
+        public List<Term> allTerms()
+        {
+            return value.asList();
+        }
     }
 
-    public boolean isUnary()
+    public static class Counter extends Operation
     {
-        return type == null && ident == null;
+        public final Term value;
+        public final boolean isSubstraction;
+
+        public Counter(Term value, boolean isSubstraction)
+        {
+            super(Type.COUNTER);
+            this.value = value;
+            this.isSubstraction = isSubstraction;
+        }
+
+        @Override
+        public String toString()
+        {
+            return (isSubstraction ? "-" : "+") + "= " + value;
+        }
+
+        public Iterable<Term> allTerms()
+        {
+            return Collections.singletonList(value);
+        }
     }
 
-    public String toString()
+    public static class Function extends Operation
     {
-        return (isUnary())
-                ? String.format("UnaryOperation(%s)", value)
-                : String.format("BinaryOperation(%s, %s, %s)", ident, type, value);
+        public final CollectionType.Function fct;
+        public final List<Term> arguments;
+
+        public Function(CollectionType.Function fct, List<Term> arguments)
+        {
+            super(Type.FUNCTION);
+            this.fct = fct;
+            this.arguments = arguments;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "." + fct + arguments;
+        }
+
+        public Iterable<Term> allTerms()
+        {
+            return arguments;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/ParsedType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ParsedType.java b/src/java/org/apache/cassandra/cql3/ParsedType.java
new file mode 100644
index 0000000..28e6be3
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/ParsedType.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.thrift.InvalidRequestException;
+
+public interface ParsedType
+{
+    public boolean isCollection();
+    public AbstractType<?> getType();
+
+    public enum Native implements ParsedType
+    {
+        ASCII    (AsciiType.instance),
+        BIGINT   (LongType.instance),
+        BLOB     (BytesType.instance),
+        BOOLEAN  (BooleanType.instance),
+        COUNTER  (CounterColumnType.instance),
+        DECIMAL  (DecimalType.instance),
+        DOUBLE   (DoubleType.instance),
+        FLOAT    (FloatType.instance),
+        INET     (InetAddressType.instance),
+        INT      (Int32Type.instance),
+        TEXT     (UTF8Type.instance),
+        TIMESTAMP(DateType.instance),
+        UUID     (UUIDType.instance),
+        VARCHAR  (UTF8Type.instance),
+        VARINT   (IntegerType.instance),
+        TIMEUUID (TimeUUIDType.instance);
+
+        private final AbstractType<?> type;
+
+        private Native(AbstractType<?> type)
+        {
+            this.type = type;
+        }
+
+        public boolean isCollection()
+        {
+            return false;
+        }
+
+        public AbstractType<?> getType()
+        {
+            return type;
+        }
+    }
+
+    public static class Custom implements ParsedType
+    {
+        private final AbstractType<?> type;
+
+        public Custom(String className) throws ConfigurationException
+        {
+            this.type =  TypeParser.parse(className);
+        }
+
+        public boolean isCollection()
+        {
+            return false;
+        }
+
+        public AbstractType<?> getType()
+        {
+            return type;
+        }
+    }
+
+    public static class Collection implements ParsedType
+    {
+        CollectionType type;
+
+        private Collection(CollectionType type)
+        {
+            this.type = type;
+        }
+
+        public static Collection map(ParsedType t1, ParsedType t2) throws InvalidRequestException
+        {
+            if (t1.isCollection() || t2.isCollection())
+                throw new InvalidRequestException("map type cannot contain another collection");
+
+            return new Collection(MapType.getInstance(t1.getType(), t2.getType()));
+        }
+
+        public static Collection list(ParsedType t) throws InvalidRequestException
+        {
+            if (t.isCollection())
+                throw new InvalidRequestException("list type cannot contain another collection");
+
+            return new Collection(ListType.getInstance(t.getType()));
+        }
+
+        public static Collection set(ParsedType t) throws InvalidRequestException
+        {
+            if (t.isCollection())
+                throw new InvalidRequestException("set type cannot contain another collection");
+
+            return new Collection(SetType.getInstance(t.getType()));
+        }
+
+        public boolean isCollection()
+        {
+            return true;
+        }
+
+        public AbstractType<?> getType()
+        {
+            return type;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index dde9b26..d084480 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -222,15 +222,15 @@ public class QueryProcessor
             CqlLexer lexer = new CqlLexer(stream);
             TokenStream tokenStream = new CommonTokenStream(lexer);
             CqlParser parser = new CqlParser(tokenStream);
-    
+
             // Parse the query string to a statement instance
             ParsedStatement statement = parser.query();
-    
+
             // The lexer and parser queue up any errors they may have encountered
             // along the way, if necessary, we turn them into exceptions here.
             lexer.throwLastRecognitionError();
             parser.throwLastRecognitionError();
-    
+
             return statement;
         }
         catch (RuntimeException re)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/Term.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Term.java b/src/java/org/apache/cassandra/cql3/Term.java
index 04fa5fa..81be6de 100644
--- a/src/java/org/apache/cassandra/cql3/Term.java
+++ b/src/java/org/apache/cassandra/cql3/Term.java
@@ -18,6 +18,8 @@
 package org.apache.cassandra.cql3;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.cassandra.config.ConfigurationException;
@@ -32,14 +34,36 @@ import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.thrift.InvalidRequestException;
 
 /** A term parsed from a CQL statement. */
-public class Term
+public class Term implements Value
 {
+    public enum Type
+    {
+        STRING, INTEGER, UUID, FLOAT, QMARK;
+
+        static Type forInt(int type)
+        {
+            if ((type == CqlParser.STRING_LITERAL) || (type == CqlParser.IDENT))
+                return STRING;
+            else if (type == CqlParser.INTEGER)
+                return INTEGER;
+            else if (type == CqlParser.UUID)
+                return UUID;
+            else if (type == CqlParser.FLOAT)
+                return FLOAT;
+            else if (type == CqlParser.QMARK)
+                return QMARK;
+
+            // FIXME: handled scenario that should never occur.
+            return null;
+        }
+    }
+
     private final String text;
-    private final TermType type;
+    private final Type type;
     public final int bindIndex;
     public final boolean isToken;
 
-    private Term(String text, TermType type, int bindIndex, boolean isToken)
+    private Term(String text, Type type, int bindIndex, boolean isToken)
     {
         this.text = text == null ? "" : text;
         this.type = type;
@@ -47,7 +71,7 @@ public class Term
         this.isToken = isToken;
     }
 
-    public Term(String text, TermType type)
+    public Term(String text, Type type)
     {
         this(text, type, -1, false);
     }
@@ -61,17 +85,17 @@ public class Term
      */
     public Term(String text, int type)
     {
-        this(text, TermType.forInt(type));
+        this(text, Type.forInt(type));
     }
 
-    public Term(long value, TermType type)
+    public Term(long value, Type type)
     {
         this(String.valueOf(value), type);
     }
 
     public Term(String text, int type, int index)
     {
-        this(text, TermType.forInt(type), index, false);
+        this(text, Type.forInt(type), index, false);
     }
 
     public static Term tokenOf(Term t)
@@ -119,7 +143,7 @@ public class Term
 
     public Token getAsToken(AbstractType<?> validator, List<ByteBuffer> variables, IPartitioner<?> p) throws InvalidRequestException
     {
-        if (!(isToken || type == TermType.STRING))
+        if (!(isToken || type == Type.STRING))
             throw new InvalidRequestException("Invalid value for token (use a string literal of the token value or the token() function)");
 
         try
@@ -146,14 +170,19 @@ public class Term
      *
      * @return the type
      */
-    public TermType getType()
+    public Type getType()
     {
         return type;
     }
 
     public boolean isBindMarker()
     {
-        return type == TermType.QMARK;
+        return type == Type.QMARK;
+    }
+
+    public List<Term> asList()
+    {
+        return Collections.singletonList(this);
     }
 
     @Override
@@ -182,7 +211,7 @@ public class Term
         if (getClass() != obj.getClass())
             return false;
         Term other = (Term) obj;
-        if (type==TermType.QMARK) return false; // markers are never equal
+        if (type==Type.QMARK) return false; // markers are never equal
         if (text == null)
         {
             if (other.text != null)
@@ -196,25 +225,3 @@ public class Term
         return true;
     }
 }
-
-enum TermType
-{
-    STRING, INTEGER, UUID, FLOAT, QMARK;
-
-    static TermType forInt(int type)
-    {
-        if ((type == CqlParser.STRING_LITERAL) || (type == CqlParser.IDENT))
-            return STRING;
-        else if (type == CqlParser.INTEGER)
-            return INTEGER;
-        else if (type == CqlParser.UUID)
-          return UUID;
-        else if (type == CqlParser.FLOAT)
-            return FLOAT;
-        else if (type == CqlParser.QMARK)
-            return QMARK;
-
-        // FIXME: handled scenario that should never occur.
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/UpdateParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
new file mode 100644
index 0000000..1372c87
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.db.*;
+
+/**
+ * A simple container that simplify passing parameters for collections methods.
+ */
+public class UpdateParameters
+{
+    public final List<ByteBuffer> variables;
+    public final long timestamp;
+    private final int ttl;
+    public final int localDeletionTime;
+
+    public UpdateParameters(List<ByteBuffer> variables, long timestamp, int ttl)
+    {
+        this.variables = variables;
+        this.timestamp = timestamp;
+        this.ttl = ttl;
+        this.localDeletionTime = (int)(System.currentTimeMillis() / 1000);
+    }
+
+    public Column makeColumn(ByteBuffer name, ByteBuffer value)
+    {
+        return ttl > 0
+             ? new ExpiringColumn(name, value, timestamp, ttl)
+             : new Column(name, value, timestamp);
+    }
+
+    public Column makeTombstone(ByteBuffer name)
+    {
+        return new DeletedColumn(name, localDeletionTime, timestamp);
+    }
+
+    public RangeTombstone makeRangeTombstone(ByteBuffer start, ByteBuffer end)
+    {
+        return new RangeTombstone(start, end, timestamp, localDeletionTime);
+    }
+
+    public RangeTombstone makeTombstoneForOverwrite(ByteBuffer start, ByteBuffer end)
+    {
+        return new RangeTombstone(start, end, timestamp - 1, localDeletionTime);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/Value.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Value.java b/src/java/org/apache/cassandra/cql3/Value.java
new file mode 100644
index 0000000..64f588b
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/Value.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.thrift.InvalidRequestException;
+
+public interface Value
+{
+    public List<Term> asList();
+
+    public interface CollectionLiteral extends Value
+    {
+        /**
+         * Validate that this literal is compatible with the provided column and
+         * throws an InvalidRequestException if not.
+         */
+        public void validateType(CFDefinition.Name value) throws InvalidRequestException;
+
+        /**
+         * Returns whether this litteral is empty or not.
+         */
+        public boolean isEmpty();
+
+        /**
+         * Returns the internal function to use to construct this literal.
+         */
+        public CollectionType.Function constructionFunction();
+    }
+
+    public static class MapLiteral extends HashMap<Term, Term> implements CollectionLiteral
+    {
+        public List<Term> asList()
+        {
+            List<Term> l = new ArrayList<Term>(2 * size());
+            for (Map.Entry<Term, Term> entry : entrySet())
+            {
+                l.add(entry.getKey());
+                l.add(entry.getValue());
+            }
+            return l;
+        }
+
+        public void validateType(CFDefinition.Name value) throws InvalidRequestException
+        {
+            if (!(value.type instanceof MapType))
+                throw new InvalidRequestException(String.format("Invalid value: %s is not a map", value.name));
+        }
+
+        public CollectionType.Function constructionFunction()
+        {
+            return CollectionType.Function.SET;
+        }
+    }
+
+    public static class ListLiteral extends ArrayList<Term> implements CollectionLiteral
+    {
+        public List<Term> asList()
+        {
+            return this;
+        }
+
+        public void validateType(CFDefinition.Name value) throws InvalidRequestException
+        {
+            if (!(value.type instanceof ListType))
+                throw new InvalidRequestException(String.format("Invalid value: %s is not a list", value.name));
+        }
+
+        public CollectionType.Function constructionFunction()
+        {
+            return CollectionType.Function.APPEND;
+        }
+    }
+
+    public static class SetLiteral extends HashSet<Term> implements CollectionLiteral
+    {
+        public List<Term> asList()
+        {
+            return new ArrayList<Term>(this);
+        }
+
+        public void validateType(CFDefinition.Name value) throws InvalidRequestException
+        {
+            // The parser don't distinguish between empty set and empty map and always return an empty set
+            if ((value.type instanceof MapType) && !isEmpty())
+                throw new InvalidRequestException(String.format("Invalid value: %s is not a map", value.name));
+            else if (!(value.type instanceof SetType))
+                throw new InvalidRequestException(String.format("Invalid value: %s is not a set", value.name));
+        }
+
+        public CollectionType.Function constructionFunction()
+        {
+            return CollectionType.Function.ADD;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index bb74bd4..26ad6cc 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -17,11 +17,14 @@
  */
 package org.apache.cassandra.cql3.statements;
 
+import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.ColumnToCollectionType;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.marshal.CounterColumnType;
 import org.apache.cassandra.service.MigrationManager;
@@ -37,11 +40,11 @@ public class AlterTableStatement extends SchemaAlteringStatement
     }
 
     public final Type oType;
-    public final String validator;
+    public final ParsedType validator;
     public final ColumnIdentifier columnName;
     private final CFPropDefs cfProps = new CFPropDefs();
 
-    public AlterTableStatement(CFName name, Type type, ColumnIdentifier columnName, String validator, Map<String, String> propertyMap)
+    public AlterTableStatement(CFName name, Type type, ColumnIdentifier columnName, ParsedType validator, Map<String, String> propertyMap)
     {
         super(name);
         this.oType = type;
@@ -73,8 +76,26 @@ public class AlterTableStatement extends SchemaAlteringStatement
                             throw new InvalidRequestException(String.format("Invalid column name %s because it conflicts with an existing column", columnName));
                     }
                 }
+
+                AbstractType<?> type = validator.getType();
+                if (type instanceof CollectionType)
+                {
+                    if (!cfDef.isComposite)
+                        throw new InvalidRequestException("Cannot use collection types with non-composite PRIMARY KEY");
+
+                    Map<ByteBuffer, CollectionType> collections = cfDef.hasCollections
+                                                                ? new HashMap<ByteBuffer, CollectionType>(cfDef.getCollectionType().defined)
+                                                                : new HashMap<ByteBuffer, CollectionType>();
+                    ColumnToCollectionType newColType = ColumnToCollectionType.getInstance(collections);
+                    List<AbstractType<?>> ctypes = new ArrayList<AbstractType<?>>(((CompositeType)cfm.comparator).types);
+                    if (cfDef.hasCollections)
+                        ctypes.set(ctypes.size() - 1, newColType);
+                    else
+                        ctypes.add(newColType);
+                    cfm.comparator = CompositeType.getInstance(ctypes);
+                }
                 cfm.addColumnDefinition(new ColumnDefinition(columnName.key,
-                                                             CFPropDefs.parseType(validator),
+                                                             validator.getType(),
                                                              null,
                                                              null,
                                                              null,
@@ -88,7 +109,7 @@ public class AlterTableStatement extends SchemaAlteringStatement
                 switch (name.kind)
                 {
                     case KEY_ALIAS:
-                        AbstractType<?> newType = CFPropDefs.parseType(validator);
+                        AbstractType<?> newType = validator.getType();
                         if (newType instanceof CounterColumnType)
                             throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", columnName));
                         cfm.keyValidator(newType);
@@ -97,16 +118,16 @@ public class AlterTableStatement extends SchemaAlteringStatement
                         assert cfDef.isComposite;
 
                         List<AbstractType<?>> newTypes = new ArrayList<AbstractType<?>>(((CompositeType) cfm.comparator).types);
-                        newTypes.set(name.position, CFPropDefs.parseType(validator));
+                        newTypes.set(name.position, validator.getType());
 
                         cfm.comparator = CompositeType.getInstance(newTypes);
                         break;
                     case VALUE_ALIAS:
-                        cfm.defaultValidator(CFPropDefs.parseType(validator));
+                        cfm.defaultValidator(validator.getType());
                         break;
                     case COLUMN_METADATA:
                         ColumnDefinition column = cfm.getColumnDefinition(columnName.key);
-                        column.setValidator(CFPropDefs.parseType(validator));
+                        column.setValidator(validator.getType());
                         cfm.addColumnDefinition(column);
                         break;
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 9ceb67d..cc30307 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.cql3.statements;
 
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.*;
@@ -30,6 +31,7 @@ import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.thrift.RequestType;
 import org.apache.cassandra.thrift.ThriftValidation;
+import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -97,7 +99,7 @@ public class BatchStatement extends ModificationStatement
     }
 
     public List<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables)
-    throws InvalidRequestException
+    throws UnavailableException, TimeoutException, InvalidRequestException
     {
         Map<Pair<String, ByteBuffer>, RowAndCounterMutation> mutations = new HashMap<Pair<String, ByteBuffer>, RowAndCounterMutation>();
         for (ModificationStatement statement : statements)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java b/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
new file mode 100644
index 0000000..e365e81
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import java.nio.ByteBuffer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.utils.Pair;
+
+public class ColumnGroupMap
+{
+    private final ByteBuffer[] fullPath;
+    private final Map<ByteBuffer, Value> map = new HashMap<ByteBuffer, Value>();
+
+    private ColumnGroupMap(ByteBuffer[] fullPath)
+    {
+        this.fullPath = fullPath;
+    }
+
+    private void add(ByteBuffer[] fullName, int idx, IColumn column)
+    {
+        ByteBuffer columnName = fullName[idx];
+        if (fullName.length == idx + 2)
+        {
+            // It's a collection
+            Value v = map.get(columnName);
+            if (v == null)
+            {
+                v = new Collection();
+                map.put(columnName, v);
+            }
+            assert v instanceof Collection;
+
+            ((Collection)v).add(Pair.create(fullName[idx + 1], column));
+        }
+        else
+        {
+            assert !map.containsKey(columnName);
+            map.put(columnName, new Simple(column));
+        }
+    }
+
+    public ByteBuffer getKeyComponent(int pos)
+    {
+        return fullPath[pos];
+    }
+
+    public IColumn getSimple(ByteBuffer key)
+    {
+        Value v = map.get(key);
+        if (v == null)
+            return null;
+
+        assert v instanceof Simple;
+        return ((Simple)v).column;
+    }
+
+    public List<Pair<ByteBuffer, IColumn>> getCollection(ByteBuffer key)
+    {
+        Value v = map.get(key);
+        if (v == null)
+            return null;
+
+        assert v instanceof Collection;
+        return (List<Pair<ByteBuffer, IColumn>>)v;
+    }
+
+    private interface Value {};
+
+    private static class Simple implements Value
+    {
+        public final IColumn column;
+
+        Simple(IColumn column)
+        {
+            this.column = column;
+        }
+    }
+
+    private static class Collection extends ArrayList<Pair<ByteBuffer, IColumn>> implements Value {}
+
+    public static class Builder
+    {
+        private final CompositeType composite;
+        private final int idx;
+        private ByteBuffer[] previous;
+
+        private final List<ColumnGroupMap> groups = new ArrayList<ColumnGroupMap>();
+        private ColumnGroupMap currentGroup;
+
+        public Builder(CompositeType composite, boolean hasCollections)
+        {
+            this.composite = composite;
+            this.idx = composite.types.size() - (hasCollections ? 2 : 1);
+        }
+
+        public void add(IColumn c)
+        {
+            if (c.isMarkedForDelete())
+                return;
+
+            ByteBuffer[] current = composite.split(c.name());
+
+            if (currentGroup == null)
+            {
+                currentGroup = new ColumnGroupMap(current);
+                currentGroup.add(current, idx, c);
+                previous = current;
+                return;
+            }
+
+            if (!isSameGroup(current))
+            {
+                groups.add(currentGroup);
+                currentGroup = new ColumnGroupMap(current);
+            }
+            currentGroup.add(current, idx, c);
+            previous = current;
+        }
+
+        /**
+         * For sparse composite, returns wheter the column belong to the same
+         * cqlRow than the previously added, based on the full list of component
+         * in the name.
+         * Two columns do belong together if they differ only by the last
+         * component.
+         */
+        private boolean isSameGroup(ByteBuffer[] c)
+        {
+            // Cql don't allow to insert columns who doesn't have all component of
+            // the composite set for sparse composite. Someone coming from thrift
+            // could hit that though. But since we have no way to handle this
+            // correctly, better fail here and tell whomever may hit that (if
+            // someone ever do) to change the definition to a dense composite.
+            assert c.length >= idx && previous.length >= idx : "Sparse composite should not have partial column names";
+            for (int i = 0; i < idx; i++)
+            {
+                if (!c[i].equals(previous[i]))
+                    return false;
+            }
+            return true;
+        }
+
+        public List<ColumnGroupMap> groups()
+        {
+            if (currentGroup != null)
+            {
+                groups.add(currentGroup);
+                currentGroup = null;
+            }
+            return groups;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
index 1ad1007..45a55da 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
@@ -34,6 +34,8 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.ColumnToCollectionType;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.marshal.ReversedType;
 import org.apache.cassandra.db.marshal.CounterColumnType;
@@ -68,7 +70,14 @@ public class CreateColumnFamilyStatement extends SchemaAlteringStatement
     private Map<ByteBuffer, ColumnDefinition> getColumns() throws InvalidRequestException
     {
         Map<ByteBuffer, ColumnDefinition> columnDefs = new HashMap<ByteBuffer, ColumnDefinition>();
-        Integer componentIndex = comparator instanceof CompositeType ? ((CompositeType)comparator).types.size() - 1 : null;
+        Integer componentIndex = null;
+        if (comparator instanceof CompositeType)
+        {
+            CompositeType ct = (CompositeType) comparator;
+            componentIndex = ct.types.get(ct.types.size() - 1) instanceof ColumnToCollectionType
+                           ? ct.types.size() - 2
+                           : ct.types.size() - 1;
+        }
 
         for (Map.Entry<ColumnIdentifier, AbstractType> col : columns.entrySet())
         {
@@ -123,7 +132,7 @@ public class CreateColumnFamilyStatement extends SchemaAlteringStatement
 
     public static class RawStatement extends CFStatement
     {
-        private final Map<ColumnIdentifier, String> definitions = new HashMap<ColumnIdentifier, String>();
+        private final Map<ColumnIdentifier, ParsedType> definitions = new HashMap<ColumnIdentifier, ParsedType>();
         private final CFPropDefs properties = new CFPropDefs();
 
         private final List<ColumnIdentifier> keyAliases = new ArrayList<ColumnIdentifier>();
@@ -159,10 +168,19 @@ public class CreateColumnFamilyStatement extends SchemaAlteringStatement
 
                 CreateColumnFamilyStatement stmt = new CreateColumnFamilyStatement(cfName, properties);
                 stmt.setBoundTerms(getBoundsTerms());
-                for (Map.Entry<ColumnIdentifier, String> entry : definitions.entrySet())
+
+                Map<ByteBuffer, CollectionType> definedCollections = null;
+                for (Map.Entry<ColumnIdentifier, ParsedType> entry : definitions.entrySet())
                 {
-                    AbstractType<?> type = CFPropDefs.parseType(entry.getValue());
-                    stmt.columns.put(entry.getKey(), type); // we'll remove what is not a column below
+                    ColumnIdentifier id = entry.getKey();
+                    ParsedType pt = entry.getValue();
+                    if (pt.isCollection())
+                    {
+                        if (definedCollections == null)
+                            definedCollections = new HashMap<ByteBuffer, CollectionType>();
+                        definedCollections.put(id.key, (CollectionType)pt.getType());
+                    }
+                    stmt.columns.put(id, pt.getType()); // we'll remove what is not a column below
                 }
 
                 // Ensure that exactly one key has been specified.
@@ -183,6 +201,8 @@ public class CreateColumnFamilyStatement extends SchemaAlteringStatement
                     // standard "dynamic" CF, otherwise it's a composite
                     if (useCompactStorage && columnAliases.size() == 1)
                     {
+                        if (definedCollections != null)
+                            throw new InvalidRequestException("Collection types are not supported with COMPACT STORAGE");
                         stmt.columnAliases.add(columnAliases.get(0).key);
                         stmt.comparator = getTypeAndRemove(stmt.columns, columnAliases.get(0));
                         if (stmt.comparator instanceof CounterColumnType)
@@ -200,9 +220,20 @@ public class CreateColumnFamilyStatement extends SchemaAlteringStatement
                                 throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", t.key));
                             types.add(type);
                         }
-                        // For sparse, we must add the last UTF8 component
-                        if (!useCompactStorage)
+
+                        if (useCompactStorage)
+                        {
+                            if (definedCollections != null)
+                                throw new InvalidRequestException("Collection types are not supported with COMPACT STORAGE");
+                        }
+                        else
+                        {
+                            // For sparse, we must add the last UTF8 component
+                            // and the collection type if there is one
                             types.add(CFDefinition.definitionType);
+                            if (definedCollections != null)
+                                types.add(ColumnToCollectionType.getInstance(definedCollections));
+                        }
 
                         if (types.isEmpty())
                             throw new IllegalStateException("Nonsensical empty parameter list for CompositeType");
@@ -212,9 +243,15 @@ public class CreateColumnFamilyStatement extends SchemaAlteringStatement
                 else
                 {
                     if (useCompactStorage)
+                    {
+                        if (definedCollections != null)
+                            throw new InvalidRequestException("Collection types are not supported with non composite PRIMARY KEY");
                         stmt.comparator = CFDefinition.definitionType;
+                    }
                     else
+                    {
                         stmt.comparator = CompositeType.getInstance(Collections.<AbstractType<?>>singletonList(CFDefinition.definitionType));
+                    }
                 }
 
                 if (stmt.columns.isEmpty())
@@ -250,12 +287,14 @@ public class CreateColumnFamilyStatement extends SchemaAlteringStatement
             AbstractType type = columns.get(t);
             if (type == null)
                 throw new InvalidRequestException(String.format("Unkown definition %s referenced in PRIMARY KEY", t));
+            if (type instanceof CollectionType)
+                throw new InvalidRequestException(String.format("Invalid collection type for PRIMARY KEY component %s", t));
             columns.remove(t);
             Boolean isReversed = definedOrdering.get(t);
             return isReversed != null && isReversed ? ReversedType.getInstance(type) : type;
         }
 
-        public void addDefinition(ColumnIdentifier def, String type)
+        public void addDefinition(ColumnIdentifier def, ParsedType type)
         {
             definedNames.add(def);
             definitions.put(def, type);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index 2b3c114..eb96417 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -18,12 +18,8 @@
 package org.apache.cassandra.cql3.statements;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.config.CFMetaData;
@@ -33,9 +29,15 @@ import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.thrift.ThriftValidation;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * A <code>DELETE</code> parsed from a CQL query statement.
@@ -43,20 +45,23 @@ import org.apache.cassandra.thrift.ThriftValidation;
 public class DeleteStatement extends ModificationStatement
 {
     private CFDefinition cfDef;
-    private final List<ColumnIdentifier> columns;
+    private final List<Selector> columns;
     private final List<Relation> whereClause;
 
+    private final List<Pair<CFDefinition.Name, Term>> toRemove;
+
     private final Map<ColumnIdentifier, List<Term>> processedKeys = new HashMap<ColumnIdentifier, List<Term>>();
 
-    public DeleteStatement(CFName name, List<ColumnIdentifier> columns, List<Relation> whereClause, Attributes attrs)
+    public DeleteStatement(CFName name, List<Selector> columns, List<Relation> whereClause, Attributes attrs)
     {
         super(name, attrs);
 
         this.columns = columns;
         this.whereClause = whereClause;
+        this.toRemove = new ArrayList<Pair<CFDefinition.Name, Term>>(columns.size());
     }
 
-    public List<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables) throws InvalidRequestException
+    public List<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables) throws UnavailableException, TimeoutException, InvalidRequestException
     {
         // Check key
         List<Term> keys = processedKeys.get(cfDef.key.name);
@@ -83,79 +88,109 @@ public class DeleteStatement extends ModificationStatement
             }
         }
 
-        List<IMutation> rowMutations = new ArrayList<IMutation>(keys.size());
+        boolean fullKey = builder.componentCount() == cfDef.columns.size();
+        boolean isRange = cfDef.isCompact ? !fullKey : (!fullKey || toRemove.isEmpty());
 
-        for (Term key : keys)
+        if (!toRemove.isEmpty() && isRange)
+            throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s since %s specified", firstEmpty, toRemove.iterator().next().left));
+
+        // Lists DISCARD operation incurs a read. Do that now.
+        boolean needsReading = false;
+        for (Pair<CFDefinition.Name, Term> p : toRemove)
         {
-            ByteBuffer rawKey = key.getByteBuffer(cfDef.key.type, variables);
-            rowMutations.add(mutationForKey(cfDef, clientState, rawKey, builder, firstEmpty, variables));
+            CFDefinition.Name name = p.left;
+            Term value = p.right;
+
+            if ((name.type instanceof ListType) && value != null)
+            {
+                needsReading = true;
+                break;
+            }
         }
 
+        List<ByteBuffer> rawKeys = new ArrayList<ByteBuffer>(keys.size());
+        for (Term key: keys)
+            rawKeys.add(key.getByteBuffer(cfDef.key.type, variables));
+
+        Map<ByteBuffer, ColumnGroupMap> rows = needsReading ? readRows(rawKeys, builder, (CompositeType)cfDef.cfm.comparator) : null;
+
+        List<IMutation> rowMutations = new ArrayList<IMutation>(keys.size());
+        UpdateParameters params = new UpdateParameters(variables, getTimestamp(clientState), -1);
+
+        for (ByteBuffer key : rawKeys)
+            rowMutations.add(mutationForKey(cfDef, key, builder, isRange, params, rows == null ? null : rows.get(key)));
+
         return rowMutations;
     }
 
-    public RowMutation mutationForKey(CFDefinition cfDef, ClientState clientState, ByteBuffer key, ColumnNameBuilder builder, CFDefinition.Name firstEmpty, List<ByteBuffer> variables)
+    public RowMutation mutationForKey(CFDefinition cfDef, ByteBuffer key, ColumnNameBuilder builder, boolean isRange, UpdateParameters params, ColumnGroupMap group)
     throws InvalidRequestException
     {
         QueryProcessor.validateKey(key);
         RowMutation rm = new RowMutation(cfDef.cfm.ksName, key);
         ColumnFamily cf = rm.addOrGet(columnFamily());
-        int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
 
         if (columns.isEmpty() && builder.componentCount() == 0)
         {
             // No columns, delete the row
-            cf.delete(new DeletionInfo(getTimestamp(clientState), localDeleteTime));
+            cf.delete(new DeletionInfo(params.timestamp, params.localDeletionTime));
         }
         else
         {
-            boolean fullKey = builder.componentCount() == cfDef.columns.size();
-            boolean isRange = cfDef.isCompact ? !fullKey : (!fullKey || columns.isEmpty());
-
-            if (!columns.isEmpty())
-            {
-                if (isRange)
-                    throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s since %s specified", firstEmpty, columns.iterator().next()));
-
-                for (ColumnIdentifier column : columns)
-                {
-                    CFDefinition.Name name = cfDef.get(column);
-                    if (name == null)
-                        throw new InvalidRequestException(String.format("Unknown identifier %s", column));
-
-                    // For compact, we only have one value except the key, so the only form of DELETE that make sense is without a column
-                    // list. However, we support having the value name for coherence with the static/sparse case
-                    if (name.kind != CFDefinition.Name.Kind.COLUMN_METADATA && name.kind != CFDefinition.Name.Kind.VALUE_ALIAS)
-                        throw new InvalidRequestException(String.format("Invalid identifier %s for deletion (should not be a PRIMARY KEY part)", column));
-                }
-            }
-
             if (isRange)
             {
                 ByteBuffer start = builder.copy().build();
                 ByteBuffer end = builder.buildAsEndOfRange();
                 QueryProcessor.validateColumnName(start); // If start is good, end is too
-                cf.delete(new DeletionInfo(start, end, cfDef.cfm.comparator, getTimestamp(clientState), localDeleteTime));
+                cf.addAtom(params.makeRangeTombstone(start, end));
             }
             else
             {
                 // Delete specific columns
                 if (cfDef.isCompact)
                 {
-                        ByteBuffer columnName = builder.build();
-                        QueryProcessor.validateColumnName(columnName);
-                        cf.addTombstone(columnName, localDeleteTime, getTimestamp(clientState));
+                    ByteBuffer columnName = builder.build();
+                    QueryProcessor.validateColumnName(columnName);
+                    cf.addColumn(params.makeTombstone(columnName));
                 }
                 else
                 {
-                    Iterator<ColumnIdentifier> iter = columns.iterator();
+                    Iterator<Pair<CFDefinition.Name, Term>> iter = toRemove.iterator();
                     while (iter.hasNext())
                     {
-                        ColumnIdentifier column = iter.next();
-                        ColumnNameBuilder b = iter.hasNext() ? builder.copy() : builder;
-                        ByteBuffer columnName = b.add(column.key).build();
-                        QueryProcessor.validateColumnName(columnName);
-                        cf.addTombstone(columnName, localDeleteTime, getTimestamp(clientState));
+                        Pair<CFDefinition.Name, Term> p = iter.next();
+                        CFDefinition.Name column = p.left;
+                        if (column.type instanceof CollectionType)
+                        {
+                            Term keySelected = p.right;
+                            if (keySelected == null)
+                            {
+                                // Delete the whole collection
+                                ByteBuffer start = builder.copy().add(column.name.key).build();
+                                QueryProcessor.validateColumnName(start);
+                                ColumnNameBuilder b = iter.hasNext() ? builder.copy() : builder;
+                                ByteBuffer end = b.add(column.name.key).buildAsEndOfRange();
+                                cf.addAtom(params.makeRangeTombstone(start, end));
+                            }
+                            else
+                            {
+                                builder.add(column.name.key);
+                                CollectionType.Function fct = CollectionType.Function.DISCARD_KEY;
+                                List<Term> args = Collections.singletonList(keySelected);
+
+                                if (column.type instanceof ListType)
+                                    ((ListType)column.type).execute(cf, builder, fct, args, params, group == null ? null : group.getCollection(column.name.key));
+                                else
+                                    ((CollectionType)column.type).executeFunction(cf, builder, fct, args, params);
+                            }
+                        }
+                        else
+                        {
+                            ColumnNameBuilder b = iter.hasNext() ? builder.copy() : builder;
+                            ByteBuffer columnName = b.add(column.name.key).build();
+                            QueryProcessor.validateColumnName(columnName);
+                            cf.addColumn(params.makeTombstone(columnName));
+                        }
                     }
                 }
             }
@@ -169,6 +204,24 @@ public class DeleteStatement extends ModificationStatement
         CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
         cfDef = metadata.getCfDef();
         UpdateStatement.processKeys(cfDef, whereClause, processedKeys, boundNames);
+
+        for (Selector column : columns)
+        {
+            CFDefinition.Name name = cfDef.get(column.id());
+            if (name == null)
+                throw new InvalidRequestException(String.format("Unknown identifier %s", column));
+
+            // For compact, we only have one value except the key, so the only form of DELETE that make sense is without a column
+            // list. However, we support having the value name for coherence with the static/sparse case
+            if (name.kind != CFDefinition.Name.Kind.COLUMN_METADATA && name.kind != CFDefinition.Name.Kind.VALUE_ALIAS)
+                throw new InvalidRequestException(String.format("Invalid identifier %s for deletion (should not be a PRIMARY KEY part)", column));
+
+            if (column.key() != null && !(name.type instanceof ListType || name.type instanceof MapType))
+                throw new InvalidRequestException(String.format("Invalid selection %s since %s is neither a list or a map", column, column.id()));
+
+            toRemove.add(Pair.create(name, column.key()));
+        }
+
         return new ParsedStatement.Prepared(this, Arrays.<ColumnSpecification>asList(boundNames));
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 34fffd4..353115a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -17,15 +17,19 @@
  */
 package org.apache.cassandra.cql3.statements;
 
+import java.io.IOError;
+import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.thrift.ConsistencyLevel;
@@ -75,8 +79,15 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
 
     public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws InvalidRequestException, UnavailableException, TimedOutException
     {
-        StorageProxy.mutate(getMutations(state, variables), getConsistencyLevel());
-        return null;
+        try
+        {
+            StorageProxy.mutate(getMutations(state, variables), getConsistencyLevel());
+            return null;
+        }
+        catch (TimeoutException e)
+        {
+            throw new TimedOutException();
+        }
     }
 
     public ConsistencyLevel getConsistencyLevel()
@@ -109,6 +120,46 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
         return timeToLive;
     }
 
+    public Map<ByteBuffer, ColumnGroupMap> readRows(List<ByteBuffer> keys, ColumnNameBuilder builder, CompositeType composite) throws UnavailableException, TimeoutException, InvalidRequestException
+    {
+        List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
+        for (ByteBuffer key : keys)
+        {
+            commands.add(new SliceFromReadCommand(keyspace(),
+                                                  key,
+                                                  new QueryPath(columnFamily()),
+                                                  builder.copy().build(),
+                                                  builder.copy().buildAsEndOfRange(),
+                                                  false,
+                                                  Integer.MAX_VALUE));
+        }
+
+        try
+        {
+            List<Row> rows = StorageProxy.read(commands, getConsistencyLevel());
+            Map<ByteBuffer, ColumnGroupMap> map = new HashMap<ByteBuffer, ColumnGroupMap>();
+            for (Row row : rows)
+            {
+                if (row.cf == null || row.cf.isEmpty())
+                    continue;
+
+                ColumnGroupMap.Builder groupBuilder = new ColumnGroupMap.Builder(composite, true);
+                for (IColumn column : row.cf)
+                    groupBuilder.add(column);
+
+                List<ColumnGroupMap> groups = groupBuilder.groups();
+                assert groups.isEmpty() || groups.size() == 1;
+                if (!groups.isEmpty())
+                    map.put(row.key.key, groups.get(0));
+            }
+            return map;
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
     /**
      * Convert statement into a list of mutations to apply on the server
      *
@@ -118,7 +169,7 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
      * @return list of the mutations
      * @throws InvalidRequestException on invalid requests
      */
-    public abstract List<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables) throws InvalidRequestException;
+    public abstract List<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables) throws UnavailableException, TimeoutException, InvalidRequestException;
 
     public abstract ParsedStatement.Prepared prepare(CFDefinition.Name[] boundNames) throws InvalidRequestException;
 }