You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2014/05/22 21:20:23 UTC

[3/6] git commit: Support multi-row selects within a partition using IN

Support multi-row selects within a partition using IN

Patch by Tyler Hobbs; review by Sylvain Lebresne for CASSANDRA-6875


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/43496384
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/43496384
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/43496384

Branch: refs/heads/cassandra-2.1
Commit: 43496384d404f2fa0af943003f2dc8fdfced4073
Parents: 2635632
Author: Tyler Hobbs <ty...@datastax.com>
Authored: Thu May 22 14:04:48 2014 -0500
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Thu May 22 14:04:48 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |    2 +-
 .../apache/cassandra/cql/SelectStatement.java   |    1 -
 .../apache/cassandra/cql3/AbstractMarker.java   |   10 +-
 .../org/apache/cassandra/cql3/CQLStatement.java |    2 +-
 .../org/apache/cassandra/cql3/Constants.java    |    6 +
 src/java/org/apache/cassandra/cql3/Cql.g        |  107 +-
 src/java/org/apache/cassandra/cql3/Lists.java   |   13 +-
 .../cassandra/cql3/MultiColumnRelation.java     |  144 +++
 .../apache/cassandra/cql3/QueryProcessor.java   |    2 +-
 .../org/apache/cassandra/cql3/Relation.java     |  107 +-
 .../org/apache/cassandra/cql3/ResultSet.java    |    1 -
 .../cassandra/cql3/SingleColumnRelation.java    |   95 ++
 src/java/org/apache/cassandra/cql3/Term.java    |   10 +
 src/java/org/apache/cassandra/cql3/Tuples.java  |  349 ++++++
 .../statements/AuthenticationStatement.java     |    2 +-
 .../cql3/statements/AuthorizationStatement.java |    2 +-
 .../cql3/statements/BatchStatement.java         |    4 +-
 .../cql3/statements/ModificationStatement.java  |   25 +-
 .../cql3/statements/MultiColumnRestriction.java |  135 +++
 .../cassandra/cql3/statements/Restriction.java  |  287 +----
 .../statements/SchemaAlteringStatement.java     |    2 +-
 .../cql3/statements/SelectStatement.java        |  829 ++++++++-----
 .../statements/SingleColumnRestriction.java     |  300 +++++
 .../cql3/statements/TruncateStatement.java      |    2 +-
 .../cassandra/cql3/statements/UseStatement.java |    2 +-
 .../apache/cassandra/db/marshal/TupleType.java  |  279 +++++
 .../cassandra/cql3/MultiColumnRelationTest.java | 1112 ++++++++++++++++++
 27 files changed, 3138 insertions(+), 692 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bddb1d1..c6c51c3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,6 @@
 2.0.9
  * Add missing iso8601 patterns for date strings (6973)
-
+ * Support selecting multiple rows in a partition using IN (CASSANDRA-6875)
 
 2.0.8
  * Always reallocate buffers in HSHA (CASSANDRA-6285)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/cql/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/SelectStatement.java b/src/java/org/apache/cassandra/cql/SelectStatement.java
index 1126738..7dd5592 100644
--- a/src/java/org/apache/cassandra/cql/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql/SelectStatement.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.cql;
 
-import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Set;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/cql3/AbstractMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/AbstractMarker.java b/src/java/org/apache/cassandra/cql3/AbstractMarker.java
index 165cb00..4329ed9 100644
--- a/src/java/org/apache/cassandra/cql3/AbstractMarker.java
+++ b/src/java/org/apache/cassandra/cql3/AbstractMarker.java
@@ -21,7 +21,6 @@ import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 
-
 /**
  * A single bind marker.
  */
@@ -84,7 +83,12 @@ public abstract class AbstractMarker extends Term.NonTerminal
         }
     }
 
-    // A raw that stands for multiple values, i.e. when we have 'IN ?'
+    /**
+     * A raw placeholder for multiple values of the same type for a single column.
+     * For example, "SELECT ... WHERE user_id IN ?'.
+     *
+     * Because a single type is used, a List is used to represent the values.
+     */
     public static class INRaw extends Raw
     {
         public INRaw(int bindIndex)
@@ -102,7 +106,7 @@ public abstract class AbstractMarker extends Term.NonTerminal
         public AbstractMarker prepare(ColumnSpecification receiver) throws InvalidRequestException
         {
             if (receiver.type instanceof CollectionType)
-                throw new InvalidRequestException("Invalid IN relation on collection column");
+                throw new InvalidRequestException("Collection columns do not support IN relations");
 
             return new Lists.Marker(bindIndex, makeInReceiver(receiver));
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/cql3/CQLStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQLStatement.java b/src/java/org/apache/cassandra/cql3/CQLStatement.java
index 81cd2b2..a1642ef 100644
--- a/src/java/org/apache/cassandra/cql3/CQLStatement.java
+++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java
@@ -57,5 +57,5 @@ public interface CQLStatement
      *
      * @param state the current query state
      */
-    public ResultMessage executeInternal(QueryState state) throws RequestValidationException, RequestExecutionException;
+    public ResultMessage executeInternal(QueryState state, QueryOptions options) throws RequestValidationException, RequestExecutionException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/cql3/Constants.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Constants.java b/src/java/org/apache/cassandra/cql3/Constants.java
index f99fd02..4ea6c2d 100644
--- a/src/java/org/apache/cassandra/cql3/Constants.java
+++ b/src/java/org/apache/cassandra/cql3/Constants.java
@@ -254,6 +254,12 @@ public abstract class Constants
         {
             return bytes;
         }
+
+        @Override
+        public String toString()
+        {
+            return ByteBufferUtil.bytesToHex(bytes);
+        }
     }
 
     public static class Marker extends AbstractMarker

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index 04f9f59..ceb2bde 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -67,6 +67,20 @@ options {
         return marker;
     }
 
+    public Tuples.Raw newTupleBindVariables(ColumnIdentifier name)
+    {
+        Tuples.Raw marker = new Tuples.Raw(bindVariables.size());
+        bindVariables.add(name);
+        return marker;
+    }
+
+    public Tuples.INRaw newTupleINBindVariables(ColumnIdentifier name)
+    {
+        Tuples.INRaw marker = new Tuples.INRaw(bindVariables.size());
+        bindVariables.add(name);
+        return marker;
+    }
+
     public void displayRecognitionError(String[] tokenNames, RecognitionException e)
     {
         String hdr = getErrorHeader(e);
@@ -880,38 +894,79 @@ relationType returns [Relation.Type op]
     ;
 
 relation[List<Relation> clauses]
-    : name=cident type=relationType t=term { $clauses.add(new Relation(name, type, t)); }
-    | K_TOKEN 
-        { List<ColumnIdentifier> l = new ArrayList<ColumnIdentifier>(); }
-          '(' name1=cident { l.add(name1); } ( ',' namen=cident { l.add(namen); })* ')'
-        type=relationType t=term
+    : name=cident type=relationType t=term { $clauses.add(new SingleColumnRelation(name, type, t)); }
+    | K_TOKEN l=tupleOfIdentifiers type=relationType t=term
         {
             for (ColumnIdentifier id : l)
-                $clauses.add(new Relation(id, type, t, true));
+                $clauses.add(new SingleColumnRelation(id, type, t, true));
         }
-    | name=cident K_IN { Term.Raw marker = null; } (QMARK { marker = newINBindVariables(null); } | ':' mid=cident { marker = newINBindVariables(mid); })
-        { $clauses.add(new Relation(name, Relation.Type.IN, marker)); }
-    | name=cident K_IN { Relation rel = Relation.createInRelation($name.id); }
-       '(' ( f1=term { rel.addInValue(f1); } (',' fN=term { rel.addInValue(fN); } )* )? ')' { $clauses.add(rel); }
-    | {
-         List<ColumnIdentifier> ids = new ArrayList<ColumnIdentifier>();
-         List<Term.Raw> terms = new ArrayList<Term.Raw>();
-      }
-        '(' n1=cident { ids.add(n1); } (',' ni=cident { ids.add(ni); })* ')'
-        type=relationType
-        '(' t1=term { terms.add(t1); } (',' ti=term { terms.add(ti); })* ')'
-      {
-          if (type == Relation.Type.IN)
-              addRecognitionError("Cannot use IN relation with tuple notation");
-          if (ids.size() != terms.size())
-              addRecognitionError(String.format("Number of values (" + terms.size() + ") in tuple notation doesn't match the number of column names (" + ids.size() + ")"));
-          else
-              for (int i = 0; i < ids.size(); i++)
-                  $clauses.add(new Relation(ids.get(i), type, terms.get(i), i == 0 ? null : ids.get(i-1)));
-      }
+    | name=cident K_IN marker=inMarker
+        { $clauses.add(new SingleColumnRelation(name, Relation.Type.IN, marker)); }
+    | name=cident K_IN inValues=singleColumnInValues
+        { $clauses.add(SingleColumnRelation.createInRelation($name.id, inValues)); }
+    | ids=tupleOfIdentifiers
+      ( K_IN
+          ( '(' ')'
+              { $clauses.add(MultiColumnRelation.createInRelation(ids, new ArrayList<Tuples.Literal>())); }
+          | tupleInMarker=inMarkerForTuple /* (a, b, c) IN ? */
+              { $clauses.add(MultiColumnRelation.createSingleMarkerInRelation(ids, tupleInMarker)); }
+          | literals=tupleOfTupleLiterals /* (a, b, c) IN ((1, 2, 3), (4, 5, 6), ...) */
+              {
+                  $clauses.add(MultiColumnRelation.createInRelation(ids, literals));
+              }
+          | markers=tupleOfMarkersForTuples /* (a, b, c) IN (?, ?, ...) */
+              { $clauses.add(MultiColumnRelation.createInRelation(ids, markers)); }
+          )
+      | type=relationType literal=tupleLiteral /* (a, b, c) > (1, 2, 3) or (a, b, c) > (?, ?, ?) */
+          {
+              $clauses.add(MultiColumnRelation.createNonInRelation(ids, type, literal));
+          }
+      | type=relationType tupleMarker=markerForTuple /* (a, b, c) >= ? */
+          { $clauses.add(MultiColumnRelation.createNonInRelation(ids, type, tupleMarker)); }
+      )
     | '(' relation[$clauses] ')'
     ;
 
+inMarker returns [AbstractMarker.INRaw marker]
+    : QMARK { $marker = newINBindVariables(null); }
+    | ':' name=cident { $marker = newINBindVariables(name); }
+    ;
+
+tupleOfIdentifiers returns [List<ColumnIdentifier> ids]
+    @init { $ids = new ArrayList<ColumnIdentifier>(); }
+    : '(' n1=cident { $ids.add(n1); } (',' ni=cident { $ids.add(ni); })* ')'
+    ;
+
+singleColumnInValues returns [List<Term.Raw> terms]
+    @init { $terms = new ArrayList<Term.Raw>(); }
+    : '(' ( t1 = term { $terms.add(t1); } (',' ti=term { $terms.add(ti); })* )? ')'
+    ;
+
+tupleLiteral returns [Tuples.Literal literal]
+    @init { List<Term.Raw> terms = new ArrayList<>(); }
+    : '(' t1=term { terms.add(t1); } (',' ti=term { terms.add(ti); })* ')' { $literal = new Tuples.Literal(terms); }
+    ;
+
+tupleOfTupleLiterals returns [List<Tuples.Literal> literals]
+    @init { $literals = new ArrayList<>(); }
+    : '(' t1=tupleLiteral { $literals.add(t1); } (',' ti=tupleLiteral { $literals.add(ti); })* ')'
+    ;
+
+markerForTuple returns [Tuples.Raw marker]
+    : QMARK { $marker = newTupleBindVariables(null); }
+    | ':' name=cident { $marker = newTupleBindVariables(name); }
+    ;
+
+tupleOfMarkersForTuples returns [List<Tuples.Raw> markers]
+    @init { $markers = new ArrayList<Tuples.Raw>(); }
+    : '(' m1=markerForTuple { $markers.add(m1); } (',' mi=markerForTuple { $markers.add(mi); })* ')'
+    ;
+
+inMarkerForTuple returns [Tuples.INRaw marker]
+    : QMARK { $marker = newTupleINBindVariables(null); }
+    | ':' name=cident { $marker = newTupleINBindVariables(name); }
+    ;
+
 comparatorType returns [CQL3Type t]
     : c=native_type     { $t = c; }
     | c=collection_type { $t = c; }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/cql3/Lists.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Lists.java b/src/java/org/apache/cassandra/cql3/Lists.java
index 4ad39db..d483dd5 100644
--- a/src/java/org/apache/cassandra/cql3/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/Lists.java
@@ -117,7 +117,7 @@ public abstract class Lists
         }
     }
 
-    public static class Value extends Term.Terminal
+    public static class Value extends Term.MultiItemTerminal
     {
         public final List<ByteBuffer> elements;
 
@@ -148,9 +148,14 @@ public abstract class Lists
         {
             return CollectionType.pack(elements, elements.size());
         }
+
+        public List<ByteBuffer> getElements()
+        {
+            return elements;
+        }
     }
 
-    /*
+    /**
      * Basically similar to a Value, but with some non-pure function (that need
      * to be evaluated at execution time) in it.
      *
@@ -200,6 +205,9 @@ public abstract class Lists
         }
     }
 
+    /**
+     * A marker for List values and IN relations
+     */
     public static class Marker extends AbstractMarker
     {
         protected Marker(int bindIndex, ColumnSpecification receiver)
@@ -212,7 +220,6 @@ public abstract class Lists
         {
             ByteBuffer value = values.get(bindIndex);
             return value == null ? null : Value.fromSerialized(value, (ListType)receiver.type);
-
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java b/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java
new file mode 100644
index 0000000..fda60df
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import java.util.List;
+
+/**
+ * A relation using the tuple notation, which typically affects multiple columns.
+ * Examples:
+ *  - SELECT ... WHERE (a, b, c) > (1, 'a', 10)
+ *  - SELECT ... WHERE (a, b, c) IN ((1, 2, 3), (4, 5, 6))
+ *  - SELECT ... WHERE (a, b) < ?
+ *  - SELECT ... WHERE (a, b) IN ?
+ */
+public class MultiColumnRelation extends Relation
+{
+    private final List<ColumnIdentifier> entities;
+
+    /** A Tuples.Literal or Tuples.Raw marker */
+    private final Term.MultiColumnRaw valuesOrMarker;
+
+    /** A list of Tuples.Literal or Tuples.Raw markers */
+    private final List<? extends Term.MultiColumnRaw> inValues;
+
+    private final Tuples.INRaw inMarker;
+
+    private MultiColumnRelation(List<ColumnIdentifier> entities, Type relationType, Term.MultiColumnRaw valuesOrMarker, List<? extends Term.MultiColumnRaw> inValues, Tuples.INRaw inMarker)
+    {
+        this.entities = entities;
+        this.relationType = relationType;
+        this.valuesOrMarker = valuesOrMarker;
+
+        this.inValues = inValues;
+        this.inMarker = inMarker;
+    }
+
+    /**
+     * Creates a multi-column EQ, LT, LTE, GT, or GTE relation.
+     * For example: "SELECT ... WHERE (a, b) > (0, 1)"
+     * @param entities the columns on the LHS of the relation
+     * @param relationType the relation operator
+     * @param valuesOrMarker a Tuples.Literal instance or a Tuples.Raw marker
+     */
+    public static MultiColumnRelation createNonInRelation(List<ColumnIdentifier> entities, Type relationType, Term.MultiColumnRaw valuesOrMarker)
+    {
+        assert relationType != Relation.Type.IN;
+        return new MultiColumnRelation(entities, relationType, valuesOrMarker, null, null);
+    }
+
+    /**
+     * Creates a multi-column IN relation with a list of IN values or markers.
+     * For example: "SELECT ... WHERE (a, b) IN ((0, 1), (2, 3))"
+     * @param entities the columns on the LHS of the relation
+     * @param inValues a list of Tuples.Literal instances or a Tuples.Raw markers
+     */
+    public static MultiColumnRelation createInRelation(List<ColumnIdentifier> entities, List<? extends Term.MultiColumnRaw> inValues)
+    {
+        return new MultiColumnRelation(entities, Relation.Type.IN, null, inValues, null);
+    }
+
+    /**
+     * Creates a multi-column IN relation with a marker for the IN values.
+     * For example: "SELECT ... WHERE (a, b) IN ?"
+     * @param entities the columns on the LHS of the relation
+     * @param inMarker a single IN marker
+     */
+    public static MultiColumnRelation createSingleMarkerInRelation(List<ColumnIdentifier> entities, Tuples.INRaw inMarker)
+    {
+        return new MultiColumnRelation(entities, Relation.Type.IN, null, null, inMarker);
+    }
+
+    public List<ColumnIdentifier> getEntities()
+    {
+        return entities;
+    }
+
+    /**
+     * For non-IN relations, returns the Tuples.Literal or Tuples.Raw marker for a single tuple.
+     */
+    public Term.MultiColumnRaw getValue()
+    {
+        assert relationType != Relation.Type.IN;
+        return valuesOrMarker;
+    }
+
+    /**
+     * For IN relations, returns the list of Tuples.Literal instances or Tuples.Raw markers.
+     * If a single IN marker was used, this will return null;
+     */
+    public List<? extends Term.MultiColumnRaw> getInValues()
+    {
+
+        return inValues;
+    }
+
+    /**
+     * For IN relations, returns the single marker for the IN values if there is one, otherwise null.
+     */
+    public Tuples.INRaw getInMarker()
+    {
+        return inMarker;
+    }
+
+    public boolean isMultiColumn()
+    {
+        return true;
+    }
+
+    @Override
+    public String toString()
+    {
+        if (relationType == Type.IN)
+        {
+            StringBuilder sb = new StringBuilder(Tuples.tupleToString(entities));
+            sb.append(" IN ");
+            sb.append(inMarker != null ? '?' : Tuples.tupleToString(inValues));
+            return sb.toString();
+        }
+        else
+        {
+            StringBuilder sb = new StringBuilder(Tuples.tupleToString(entities));
+            sb.append(" ");
+            sb.append(relationType);
+            sb.append(" ");
+            sb.append(valuesOrMarker);
+            return sb.toString();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 15ee59f..30d1bd7 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -205,7 +205,7 @@ public class QueryProcessor implements QueryHandler
             state.setKeyspace(Keyspace.SYSTEM_KS);
             CQLStatement statement = getStatement(query, state).statement;
             statement.validate(state);
-            ResultMessage result = statement.executeInternal(qState);
+            ResultMessage result = statement.executeInternal(qState, QueryOptions.DEFAULT);
             if (result instanceof ResultMessage.Rows)
                 return new UntypedResultSet(((ResultMessage.Rows)result).result);
             else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/cql3/Relation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Relation.java b/src/java/org/apache/cassandra/cql3/Relation.java
index 9d065bf..0f1366d 100644
--- a/src/java/org/apache/cassandra/cql3/Relation.java
+++ b/src/java/org/apache/cassandra/cql3/Relation.java
@@ -17,65 +17,35 @@
  */
 package org.apache.cassandra.cql3;
 
-import java.util.ArrayList;
-import java.util.List;
+public abstract class Relation {
 
-/**
- * Relations encapsulate the relationship between an entity of some kind, and
- * a value (term). For example, <key> > "start" or "colname1" = "somevalue".
- *
- */
-public class Relation
-{
-    private final ColumnIdentifier entity;
-    private final Type relationType;
-    private final Term.Raw value;
-    private final List<Term.Raw> inValues;
-    public final boolean onToken;
-
-    // Will be null unless for tuple notations (#4851)
-    public final ColumnIdentifier previousInTuple;
+    protected Type relationType;
 
     public static enum Type
     {
         EQ, LT, LTE, GTE, GT, IN;
-    }
-
-    private Relation(ColumnIdentifier entity, Type type, Term.Raw value, List<Term.Raw> inValues, boolean onToken, ColumnIdentifier previousInTuple)
-    {
-        this.entity = entity;
-        this.relationType = type;
-        this.value = value;
-        this.inValues = inValues;
-        this.onToken = onToken;
-        this.previousInTuple = previousInTuple;
-    }
-
-    /**
-     * Creates a new relation.
-     *
-     * @param entity the kind of relation this is; what the term is being compared to.
-     * @param type the type that describes how this entity relates to the value.
-     * @param value the value being compared.
-     */
-    public Relation(ColumnIdentifier entity, Type type, Term.Raw value)
-    {
-        this(entity, type, value, null, false, null);
-    }
-
-    public Relation(ColumnIdentifier entity, Type type, Term.Raw value, boolean onToken)
-    {
-        this(entity, type, value, null, onToken, null);
-    }
 
-    public Relation(ColumnIdentifier entity, Type type, Term.Raw value, ColumnIdentifier previousInTuple)
-    {
-        this(entity, type, value, null, false, previousInTuple);
-    }
-
-    public static Relation createInRelation(ColumnIdentifier entity)
-    {
-        return new Relation(entity, Type.IN, null, new ArrayList<Term.Raw>(), false, null);
+        @Override
+        public String toString()
+        {
+            switch (this)
+            {
+                case EQ:
+                    return "=";
+                case LT:
+                    return "<";
+                case LTE:
+                    return "<=";
+                case GT:
+                    return ">";
+                case GTE:
+                    return ">=";
+                case IN:
+                    return "IN";
+                default:
+                    return this.name();
+            }
+        }
     }
 
     public Type operator()
@@ -83,34 +53,5 @@ public class Relation
         return relationType;
     }
 
-    public ColumnIdentifier getEntity()
-    {
-        return entity;
-    }
-
-    public Term.Raw getValue()
-    {
-        assert relationType != Type.IN || value == null || value instanceof AbstractMarker.INRaw;
-        return value;
-    }
-
-    public List<Term.Raw> getInValues()
-    {
-        assert relationType == Type.IN;
-        return inValues;
-    }
-
-    public void addInValue(Term.Raw t)
-    {
-        inValues.add(t);
-    }
-
-    @Override
-    public String toString()
-    {
-        if (relationType == Type.IN)
-            return String.format("%s IN %s", entity, inValues);
-        else
-            return String.format("%s %s %s", entity, relationType, value);
-    }
+    public abstract boolean isMultiColumn();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java
index e4f27f9..4cda0cd 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -21,7 +21,6 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
 
 import org.apache.cassandra.transport.*;
 import org.apache.cassandra.db.marshal.AbstractType;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java b/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java
new file mode 100644
index 0000000..5464c23
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import java.util.List;
+
+/**
+ * Relations encapsulate the relationship between an entity of some kind, and
+ * a value (term). For example, <key> > "start" or "colname1" = "somevalue".
+ *
+ */
+public class SingleColumnRelation extends Relation
+{
+    private final ColumnIdentifier entity;
+    private final Term.Raw value;
+    private final List<Term.Raw> inValues;
+    public final boolean onToken;
+
+    private SingleColumnRelation(ColumnIdentifier entity, Type type, Term.Raw value, List<Term.Raw> inValues, boolean onToken)
+    {
+        this.entity = entity;
+        this.relationType = type;
+        this.value = value;
+        this.inValues = inValues;
+        this.onToken = onToken;
+    }
+
+    /**
+     * Creates a new relation.
+     *
+     * @param entity the kind of relation this is; what the term is being compared to.
+     * @param type the type that describes how this entity relates to the value.
+     * @param value the value being compared.
+     */
+    public SingleColumnRelation(ColumnIdentifier entity, Type type, Term.Raw value)
+    {
+        this(entity, type, value, null, false);
+    }
+
+    public SingleColumnRelation(ColumnIdentifier entity, Type type, Term.Raw value, boolean onToken)
+    {
+        this(entity, type, value, null, onToken);
+    }
+
+    public static SingleColumnRelation createInRelation(ColumnIdentifier entity, List<Term.Raw> inValues)
+    {
+        return new SingleColumnRelation(entity, Type.IN, null, inValues, false);
+    }
+
+    public ColumnIdentifier getEntity()
+    {
+        return entity;
+    }
+
+    public Term.Raw getValue()
+    {
+        assert relationType != Type.IN || value == null || value instanceof AbstractMarker.INRaw;
+        return value;
+    }
+
+    public List<Term.Raw> getInValues()
+    {
+        assert relationType == Type.IN;
+        return inValues;
+    }
+
+    public boolean isMultiColumn()
+    {
+        return false;
+    }
+
+    @Override
+    public String toString()
+    {
+        if (relationType == Type.IN)
+            return String.format("%s IN %s", entity, inValues);
+        else
+            return String.format("%s %s %s", entity, relationType, value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/cql3/Term.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Term.java b/src/java/org/apache/cassandra/cql3/Term.java
index d69fc33..96b4b71 100644
--- a/src/java/org/apache/cassandra/cql3/Term.java
+++ b/src/java/org/apache/cassandra/cql3/Term.java
@@ -91,6 +91,11 @@ public interface Term
         public Term prepare(ColumnSpecification receiver) throws InvalidRequestException;
     }
 
+    public interface MultiColumnRaw extends Raw
+    {
+        public Term prepare(List<? extends ColumnSpecification> receiver) throws InvalidRequestException;
+    }
+
     /**
      * A terminal term, one that can be reduced to a byte buffer directly.
      *
@@ -128,6 +133,11 @@ public interface Term
         }
     }
 
+    public abstract class MultiItemTerminal extends Terminal
+    {
+        public abstract List<ByteBuffer> getElements();
+    }
+
     /**
      * A non terminal term, i.e. a term that can only be reduce to a byte buffer
      * at execution time.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/cql3/Tuples.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Tuples.java b/src/java/org/apache/cassandra/cql3/Tuples.java
new file mode 100644
index 0000000..9e86912
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/Tuples.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.TupleType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.serializers.MarshalException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+/**
+ * Static helper methods and classes for tuples.
+ */
+public class Tuples
+{
+    private static final Logger logger = LoggerFactory.getLogger(Tuples.class);
+
+    /**
+     * A raw, literal tuple.  When prepared, this will become a Tuples.Value or Tuples.DelayedValue, depending
+     * on whether the tuple holds NonTerminals.
+     */
+    public static class Literal implements Term.MultiColumnRaw
+    {
+        private final List<Term.Raw> elements;
+
+        public Literal(List<Term.Raw> elements)
+        {
+            this.elements = elements;
+        }
+
+        public Term prepare(List<? extends ColumnSpecification> receivers) throws InvalidRequestException
+        {
+            if (elements.size() != receivers.size())
+                throw new InvalidRequestException(String.format("Expected %d elements in value tuple, but got %d: %s", receivers.size(), elements.size(), this));
+
+            List<Term> values = new ArrayList<>(elements.size());
+            boolean allTerminal = true;
+            for (int i = 0; i < elements.size(); i++)
+            {
+                Term t = elements.get(i).prepare(receivers.get(i));
+                if (t instanceof Term.NonTerminal)
+                    allTerminal = false;
+
+                values.add(t);
+            }
+            DelayedValue value = new DelayedValue(values);
+            return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
+        }
+
+        public Term prepare(ColumnSpecification receiver)
+        {
+            throw new AssertionError("Tuples.Literal instances require a list of receivers for prepare()");
+        }
+
+        public boolean isAssignableTo(ColumnSpecification receiver)
+        {
+            // tuples shouldn't be assignable to anything right now
+            return false;
+        }
+
+        @Override
+        public String toString()
+        {
+            return tupleToString(elements);
+        }
+    }
+
+    /**
+     * A tuple of terminal values (e.g (123, 'abc')).
+     */
+    public static class Value extends Term.MultiItemTerminal
+    {
+        public final ByteBuffer[] elements;
+
+        public Value(ByteBuffer[] elements)
+        {
+            this.elements = elements;
+        }
+
+        public static Value fromSerialized(ByteBuffer bytes, TupleType type)
+        {
+            return new Value(type.split(bytes));
+        }
+
+        public ByteBuffer get()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public List<ByteBuffer> getElements()
+        {
+            return Arrays.asList(elements);
+        }
+    }
+
+    /**
+     * Similar to Value, but contains at least one NonTerminal, such as a non-pure functions or bind marker.
+     */
+    public static class DelayedValue extends Term.NonTerminal
+    {
+        public final List<Term> elements;
+
+        public DelayedValue(List<Term> elements)
+        {
+            this.elements = elements;
+        }
+
+        public boolean containsBindMarker()
+        {
+            for (Term term : elements)
+                if (term.containsBindMarker())
+                    return true;
+
+            return false;
+        }
+
+        public void collectMarkerSpecification(VariableSpecifications boundNames)
+        {
+            for (Term term : elements)
+                term.collectMarkerSpecification(boundNames);
+        }
+
+        public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+        {
+            ByteBuffer[] buffers = new ByteBuffer[elements.size()];
+            for (int i=0; i < elements.size(); i++)
+            {
+                ByteBuffer bytes = elements.get(i).bindAndGet(values);
+                if (bytes == null)
+                    throw new InvalidRequestException("Tuples may not contain null values");
+
+                buffers[i] = elements.get(i).bindAndGet(values);
+            }
+            return new Value(buffers);
+        }
+
+        @Override
+        public String toString()
+        {
+            return tupleToString(elements);
+        }
+    }
+
+    /**
+     * A terminal value for a list of IN values that are tuples. For example: "SELECT ... WHERE (a, b, c) IN ?"
+     * This is similar to Lists.Value, but allows us to keep components of the tuples in the list separate.
+     */
+    public static class InValue extends Term.Terminal
+    {
+        List<List<ByteBuffer>> elements;
+
+        public InValue(List<List<ByteBuffer>> items)
+        {
+            this.elements = items;
+        }
+
+        public static InValue fromSerialized(ByteBuffer value, ListType type) throws InvalidRequestException
+        {
+            try
+            {
+                // Collections have this small hack that validate cannot be called on a serialized object,
+                // but compose does the validation (so we're fine).
+                List<?> l = (List<?>)type.compose(value);
+
+                assert type.elements instanceof TupleType;
+                TupleType tupleType = (TupleType) type.elements;
+
+                // type.split(bytes)
+                List<List<ByteBuffer>> elements = new ArrayList<>(l.size());
+                for (Object element : l)
+                    elements.add(Arrays.asList(tupleType.split(type.elements.decompose(element))));
+                return new InValue(elements);
+            }
+            catch (MarshalException e)
+            {
+                throw new InvalidRequestException(e.getMessage());
+            }
+        }
+
+        public ByteBuffer get()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public List<List<ByteBuffer>> getSplitValues()
+        {
+            return elements;
+        }
+    }
+
+    /**
+     * A raw placeholder for a tuple of values for different multiple columns, each of which may have a different type.
+     * For example, "SELECT ... WHERE (col1, col2) > ?".
+     */
+    public static class Raw extends AbstractMarker.Raw implements Term.MultiColumnRaw
+    {
+        public Raw(int bindIndex)
+        {
+            super(bindIndex);
+        }
+
+        private static ColumnSpecification makeReceiver(List<? extends ColumnSpecification> receivers) throws InvalidRequestException
+        {
+            List<AbstractType<?>> types = new ArrayList<>(receivers.size());
+            StringBuilder inName = new StringBuilder("(");
+            for (int i = 0; i < receivers.size(); i++)
+            {
+                ColumnSpecification receiver = receivers.get(i);
+                inName.append(receiver.name);
+                if (i < receivers.size() - 1)
+                    inName.append(",");
+                types.add(receiver.type);
+            }
+            inName.append(')');
+
+            ColumnIdentifier identifier = new ColumnIdentifier(inName.toString(), true);
+            TupleType type = new TupleType(types);
+            return new ColumnSpecification(receivers.get(0).ksName, receivers.get(0).cfName, identifier, type);
+        }
+
+        public AbstractMarker prepare(List<? extends ColumnSpecification> receivers) throws InvalidRequestException
+        {
+            return new Tuples.Marker(bindIndex, makeReceiver(receivers));
+        }
+
+        @Override
+        public AbstractMarker prepare(ColumnSpecification receiver)
+        {
+            throw new AssertionError("Tuples.Raw.prepare() requires a list of receivers");
+        }
+    }
+
+    /**
+     * A raw marker for an IN list of tuples, like "SELECT ... WHERE (a, b, c) IN ?"
+     */
+    public static class INRaw extends AbstractMarker.Raw
+    {
+        public INRaw(int bindIndex)
+        {
+            super(bindIndex);
+        }
+
+        private static ColumnSpecification makeInReceiver(List<? extends ColumnSpecification> receivers) throws InvalidRequestException
+        {
+            List<AbstractType<?>> types = new ArrayList<>(receivers.size());
+            StringBuilder inName = new StringBuilder("in(");
+            for (int i = 0; i < receivers.size(); i++)
+            {
+                ColumnSpecification receiver = receivers.get(i);
+                inName.append(receiver.name);
+                if (i < receivers.size() - 1)
+                    inName.append(",");
+
+                if (receiver.type instanceof CollectionType)
+                    throw new InvalidRequestException("Collection columns do not support IN relations");
+                types.add(receiver.type);
+            }
+            inName.append(')');
+
+            ColumnIdentifier identifier = new ColumnIdentifier(inName.toString(), true);
+            TupleType type = new TupleType(types);
+            return new ColumnSpecification(receivers.get(0).ksName, receivers.get(0).cfName, identifier, ListType.getInstance(type));
+        }
+
+        public AbstractMarker prepare(List<? extends ColumnSpecification> receivers) throws InvalidRequestException
+        {
+            return new InMarker(bindIndex, makeInReceiver(receivers));
+        }
+
+        @Override
+        public AbstractMarker prepare(ColumnSpecification receiver)
+        {
+            throw new AssertionError("Tuples.INRaw.prepare() requires a list of receivers");
+        }
+    }
+
+    /**
+     * Represents a marker for a single tuple, like "SELECT ... WHERE (a, b, c) > ?"
+     */
+    public static class Marker extends AbstractMarker
+    {
+        public Marker(int bindIndex, ColumnSpecification receiver)
+        {
+            super(bindIndex, receiver);
+        }
+
+        public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+        {
+            ByteBuffer value = values.get(bindIndex);
+            if (value == null)
+                return null;
+
+            return value == null ? null : Value.fromSerialized(value, (TupleType)receiver.type);
+        }
+    }
+
+    /**
+     * Represents a marker for a set of IN values that are tuples, like "SELECT ... WHERE (a, b, c) IN ?"
+     */
+    public static class InMarker extends AbstractMarker
+    {
+        protected InMarker(int bindIndex, ColumnSpecification receiver)
+        {
+            super(bindIndex, receiver);
+            assert receiver.type instanceof ListType;
+        }
+
+        public InValue bind(List<ByteBuffer> values) throws InvalidRequestException
+        {
+            ByteBuffer value = values.get(bindIndex);
+            return value == null ? null : InValue.fromSerialized(value, (ListType)receiver.type);
+        }
+    }
+
+    public static String tupleToString(List<?> items)
+    {
+
+        StringBuilder sb = new StringBuilder("(");
+        for (int i = 0; i < items.size(); i++)
+        {
+            sb.append(items.get(i));
+            if (i < items.size() - 1)
+                sb.append(", ");
+        }
+        sb.append(')');
+        return sb.toString();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java b/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
index 5fcf085..b47dd92 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
@@ -45,7 +45,7 @@ public abstract class AuthenticationStatement extends ParsedStatement implements
 
     public abstract ResultMessage execute(ClientState state) throws RequestExecutionException, RequestValidationException;
 
-    public ResultMessage executeInternal(QueryState state)
+    public ResultMessage executeInternal(QueryState state, QueryOptions options)
     {
         // executeInternal is for local query only, thus altering users doesn't make sense and is not supported
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
index db4581e..2c7f2cb 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
@@ -47,7 +47,7 @@ public abstract class AuthorizationStatement extends ParsedStatement implements
 
     public abstract ResultMessage execute(ClientState state) throws RequestValidationException, RequestExecutionException;
 
-    public ResultMessage executeInternal(QueryState state)
+    public ResultMessage executeInternal(QueryState state, QueryOptions options)
     {
         // executeInternal is for local query only, thus altering permission doesn't make sense and is not supported
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 6a1201b..875e41c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -327,11 +327,11 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, key, cfName, result, columnsWithConditions, true));
     }
 
-    public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException
+    public ResultMessage executeInternal(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException
     {
         assert !hasConditions;
 
-        for (IMutation mutation : getMutations(new PreparedBatchVariables(Collections.<ByteBuffer>emptyList()), true, null, queryState.getTimestamp()))
+        for (IMutation mutation : getMutations(new PreparedBatchVariables(options.getValues()), true, null, queryState.getTimestamp()))
             mutation.apply();
         return null;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 448722e..11aa0b1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -32,14 +32,11 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.db.marshal.BooleanType;
 import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.service.CASConditions;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
@@ -250,14 +247,21 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
 
     public void addKeyValue(CFDefinition.Name name, Term value) throws InvalidRequestException
     {
-        addKeyValues(name, new Restriction.EQ(value, false));
+        addKeyValues(name, new SingleColumnRestriction.EQ(value, false));
     }
 
     public void processWhereClause(List<Relation> whereClause, VariableSpecifications names) throws InvalidRequestException
     {
         CFDefinition cfDef = cfm.getCfDef();
-        for (Relation rel : whereClause)
+        for (Relation relation : whereClause)
         {
+            if (!(relation instanceof SingleColumnRelation))
+            {
+                throw new InvalidRequestException(
+                        String.format("Multi-column relations cannot be used in WHERE clauses for modification statements: %s", relation));
+            }
+            SingleColumnRelation rel = (SingleColumnRelation) relation;
+
             CFDefinition.Name name = cfDef.get(rel.getEntity());
             if (name == null)
                 throw new InvalidRequestException(String.format("Unknown key identifier %s", rel.getEntity()));
@@ -272,7 +276,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
                     {
                         Term t = rel.getValue().prepare(name);
                         t.collectMarkerSpecification(names);
-                        restriction = new Restriction.EQ(t, false);
+                        restriction = new SingleColumnRestriction.EQ(t, false);
                     }
                     else if (name.kind == CFDefinition.Name.Kind.KEY_ALIAS && rel.operator() == Relation.Type.IN)
                     {
@@ -280,7 +284,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
                         {
                             Term t = rel.getValue().prepare(name);
                             t.collectMarkerSpecification(names);
-                            restriction = Restriction.IN.create(t);
+                            restriction = new SingleColumnRestriction.InWithMarker((Lists.Marker)t);
                         }
                         else
                         {
@@ -291,7 +295,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
                                 t.collectMarkerSpecification(names);
                                 values.add(t);
                             }
-                            restriction = Restriction.IN.create(values);
+                            restriction = new SingleColumnRestriction.InWithValues(values);
                         }
                     }
                     else
@@ -671,12 +675,13 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return builder.build();
     }
 
-    public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException
+    public ResultMessage executeInternal(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException
     {
         if (hasConditions())
             throw new UnsupportedOperationException();
 
-        for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp()))
+        List<ByteBuffer> variables = options.getValues();
+        for (IMutation mutation : getMutations(variables, true, null, queryState.getTimestamp()))
             mutation.apply();
         return null;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/cql3/statements/MultiColumnRestriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/MultiColumnRestriction.java b/src/java/org/apache/cassandra/cql3/statements/MultiColumnRestriction.java
new file mode 100644
index 0000000..f643684
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/MultiColumnRestriction.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import org.apache.cassandra.cql3.AbstractMarker;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.Tuples;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public interface MultiColumnRestriction extends Restriction
+{
+    public static class EQ extends SingleColumnRestriction.EQ implements MultiColumnRestriction
+    {
+        public EQ(Term value, boolean onToken)
+        {
+            super(value, onToken);
+        }
+
+        public boolean isMultiColumn()
+        {
+            return true;
+        }
+
+        public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+        {
+            Tuples.Value t = (Tuples.Value)value.bind(variables);
+            return t.getElements();
+        }
+    }
+
+    public interface IN extends MultiColumnRestriction
+    {
+        public List<List<ByteBuffer>> splitValues(List<ByteBuffer> variables) throws InvalidRequestException;
+    }
+
+    /**
+     * An IN restriction that has a set of terms for in values.
+     * For example: "SELECT ... WHERE (a, b, c) IN ((1, 2, 3), (4, 5, 6))" or "WHERE (a, b, c) IN (?, ?)"
+     */
+    public static class InWithValues extends SingleColumnRestriction.InWithValues implements MultiColumnRestriction.IN
+    {
+        public InWithValues(List<Term> values)
+        {
+            super(values);
+        }
+
+        public boolean isMultiColumn()
+        {
+            return true;
+        }
+
+        public List<List<ByteBuffer>> splitValues(List<ByteBuffer> variables) throws InvalidRequestException
+        {
+            List<List<ByteBuffer>> buffers = new ArrayList<>(values.size());
+            for (Term value : values)
+            {
+                Term.MultiItemTerminal term = (Term.MultiItemTerminal)value.bind(variables);
+                buffers.add(term.getElements());
+            }
+            return buffers;
+        }
+    }
+
+    /**
+     * An IN restriction that uses a single marker for a set of IN values that are tuples.
+     * For example: "SELECT ... WHERE (a, b, c) IN ?"
+     */
+    public static class InWithMarker extends SingleColumnRestriction.InWithMarker implements MultiColumnRestriction.IN
+    {
+        public InWithMarker(AbstractMarker marker)
+        {
+            super(marker);
+        }
+
+        public boolean isMultiColumn()
+        {
+            return true;
+        }
+
+        public List<List<ByteBuffer>> splitValues(List<ByteBuffer> variables) throws InvalidRequestException
+        {
+            Tuples.InValue inValue = ((Tuples.InMarker) marker).bind(variables);
+            if (inValue == null)
+                throw new InvalidRequestException("Invalid null value for IN restriction");
+            return inValue.getSplitValues();
+        }
+    }
+
+    public static class Slice extends SingleColumnRestriction.Slice implements MultiColumnRestriction
+    {
+        public Slice(boolean onToken)
+        {
+            super(onToken);
+        }
+
+        public boolean isMultiColumn()
+        {
+            return true;
+        }
+
+        public ByteBuffer bound(Bound b, List<ByteBuffer> variables) throws InvalidRequestException
+        {
+            throw new UnsupportedOperationException("Multicolumn slice restrictions do not support bound()");
+        }
+
+        /**
+         * Similar to bounds(), but returns one ByteBuffer per-component in the bound instead of a single
+         * ByteBuffer to represent the entire bound.
+         */
+        public List<ByteBuffer> componentBounds(Bound b, List<ByteBuffer> variables) throws InvalidRequestException
+        {
+            Tuples.Value value = (Tuples.Value)bounds[b.idx].bind(variables);
+            return value.getElements();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/cql3/statements/Restriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Restriction.java b/src/java/org/apache/cassandra/cql3/statements/Restriction.java
index 6323acb..3d33bde 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Restriction.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Restriction.java
@@ -18,12 +18,8 @@
 package org.apache.cassandra.cql3.statements;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
-import com.google.common.base.Objects;
-
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.thrift.IndexOperator;
 import org.apache.cassandra.cql3.*;
@@ -39,289 +35,34 @@ public interface Restriction
     public boolean isSlice();
     public boolean isEQ();
     public boolean isIN();
+    public boolean isMultiColumn();
 
     // Only supported for EQ and IN, but it's convenient to have here
     public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException;
 
-    public static class EQ implements Restriction
-    {
-        private final Term value;
-        private final boolean onToken;
-
-        public EQ(Term value, boolean onToken)
-        {
-            this.value = value;
-            this.onToken = onToken;
-        }
-
-        public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
-        {
-            return Collections.singletonList(value.bindAndGet(variables));
-        }
-
-        public boolean isSlice()
-        {
-            return false;
-        }
-
-        public boolean isEQ()
-        {
-            return true;
-        }
-
-        public boolean isIN()
-        {
-            return false;
-        }
-
-        public boolean isOnToken()
-        {
-            return onToken;
-        }
-
-        @Override
-        public String toString()
-        {
-            return String.format("EQ(%s)%s", value, onToken ? "*" : "");
-        }
-    }
+    public static interface EQ extends Restriction {}
 
-    public static abstract class IN implements Restriction
+    public static interface IN extends Restriction
     {
-        public static IN create(List<Term> values)
-        {
-            return new WithValues(values);
-        }
-
-        public static IN create(Term value) throws InvalidRequestException
-        {
-            assert value instanceof Lists.Marker; // we shouldn't have got there otherwise
-            return new WithMarker((Lists.Marker)value);
-        }
-
-        public boolean isSlice()
-        {
-            return false;
-        }
-
-        public boolean isEQ()
-        {
-            return false;
-        }
-
-        public boolean isIN()
-        {
-            return true;
-        }
-
-        // Used when we need to know if it's a IN with just one value before we have
-        // the bind variables. This is ugly and only there for backward compatiblity
-        // because we used to treate IN with 1 value like an EQ and need to preserve
-        // this behavior.
-        public abstract boolean canHaveOnlyOneValue();
-
-        public boolean isOnToken()
-        {
-            return false;
-        }
-
-        private static class WithValues extends IN
-        {
-            private final List<Term> values;
-
-            private WithValues(List<Term> values)
-            {
-                this.values = values;
-            }
-
-            public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
-            {
-                List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(values.size());
-                for (Term value : values)
-                    buffers.add(value.bindAndGet(variables));
-                return buffers;
-            }
-
-            public boolean canHaveOnlyOneValue()
-            {
-                return values.size() == 1;
-            }
-
-            @Override
-            public String toString()
-            {
-                return String.format("IN(%s)", values);
-            }
-        }
-
-        private static class WithMarker extends IN
-        {
-            private final Lists.Marker marker;
-
-            private WithMarker(Lists.Marker marker)
-            {
-                this.marker = marker;
-            }
-
-            public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
-            {
-                Lists.Value lval = marker.bind(variables);
-                if (lval == null)
-                    throw new InvalidRequestException("Invalid null value for IN restriction");
-                return lval.elements;
-            }
-
-            public boolean canHaveOnlyOneValue()
-            {
-                return false;
-            }
-
-            @Override
-            public String toString()
-            {
-                return "IN ?";
-            }
-        }
+        public boolean canHaveOnlyOneValue();
     }
 
-    public static class Slice implements Restriction
+    public static interface Slice extends Restriction
     {
-        private final Term[] bounds;
-        private final boolean[] boundInclusive;
-        private final boolean onToken;
-
-        // The name of the column that was preceding this one if the tuple notation of #4851 was used
-        // (note: if it is set for both bound, we'll validate both have the same previous value, but we
-        // still need to distinguish if it's set or not for both bound)
-        private final ColumnIdentifier[] previous;
-
-        public Slice(boolean onToken)
-        {
-            this.bounds = new Term[2];
-            this.boundInclusive = new boolean[2];
-            this.previous = new ColumnIdentifier[2];
-            this.onToken = onToken;
-        }
-
-        public boolean isSlice()
-        {
-            return true;
-        }
-
-        public boolean isEQ()
-        {
-            return false;
-        }
-
-        public boolean isIN()
-        {
-            return false;
-        }
-
-        public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
-        {
-            throw new UnsupportedOperationException();
-        }
-
-        public boolean isOnToken()
-        {
-            return onToken;
-        }
-
-        public boolean hasBound(Bound b)
-        {
-            return bounds[b.idx] != null;
-        }
-
-        public ByteBuffer bound(Bound b, List<ByteBuffer> variables) throws InvalidRequestException
-        {
-            return bounds[b.idx].bindAndGet(variables);
-        }
-
-        public boolean isInclusive(Bound b)
-        {
-            return bounds[b.idx] == null || boundInclusive[b.idx];
-        }
-
-        public Relation.Type getRelation(Bound eocBound, Bound inclusiveBound)
-        {
-            switch (eocBound)
-            {
-                case START:
-                    return boundInclusive[inclusiveBound.idx] ? Relation.Type.GTE : Relation.Type.GT;
-                case END:
-                    return boundInclusive[inclusiveBound.idx] ? Relation.Type.LTE : Relation.Type.LT;
-            }
-            throw new AssertionError();
-        }
-
-        public IndexOperator getIndexOperator(Bound b)
-        {
-            switch (b)
-            {
-                case START:
-                    return boundInclusive[b.idx] ? IndexOperator.GTE : IndexOperator.GT;
-                case END:
-                    return boundInclusive[b.idx] ? IndexOperator.LTE : IndexOperator.LT;
-            }
-            throw new AssertionError();
-        }
-
-        public void setBound(ColumnIdentifier name, Relation.Type type, Term t, ColumnIdentifier previousName) throws InvalidRequestException
-        {
-            Bound b;
-            boolean inclusive;
-            switch (type)
-            {
-                case GT:
-                    b = Bound.START;
-                    inclusive = false;
-                    break;
-                case GTE:
-                    b = Bound.START;
-                    inclusive = true;
-                    break;
-                case LT:
-                    b = Bound.END;
-                    inclusive = false;
-                    break;
-                case LTE:
-                    b = Bound.END;
-                    inclusive = true;
-                    break;
-                default:
-                    throw new AssertionError();
-            }
+        public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException;
 
-            if (bounds[b.idx] != null)
-                throw new InvalidRequestException(String.format("Invalid restrictions found on %s", name));
+        /** Returns true if the start or end bound (depending on the argument) is set, false otherwise */
+        public boolean hasBound(Bound b);
 
-            bounds[b.idx] = t;
-            boundInclusive[b.idx] = inclusive;
+        public ByteBuffer bound(Bound b, List<ByteBuffer> variables) throws InvalidRequestException;
 
-            // If a bound is part of a tuple notation (#4851), the other bound must either also be or must not be set at all,
-            // and this even if there is a 2ndary index (it's not supported by the 2ndary code). And it's easier to validate
-            // this here so we do.
-            Bound reverse = Bound.reverse(b);
-            if (hasBound(reverse) && !(Objects.equal(previousName, previous[reverse.idx])))
-                throw new InvalidRequestException(String.format("Clustering column %s cannot be restricted both inside a tuple notation and outside it", name));
+        /** Returns true if the start or end bound (depending on the argument) is inclusive, false otherwise */
+        public boolean isInclusive(Bound b);
 
-            previous[b.idx] = previousName;
-        }
+        public Relation.Type getRelation(Bound eocBound, Bound inclusiveBound);
 
-        public boolean isPartOfTuple()
-        {
-            return previous[Bound.START.idx] != null || previous[Bound.END.idx] != null;
-        }
+        public IndexOperator getIndexOperator(Bound b);
 
-        @Override
-        public String toString()
-        {
-            return String.format("SLICE(%s %s, %s %s)%s", boundInclusive[0] ? ">=" : ">",
-                                                          bounds[0],
-                                                          boundInclusive[1] ? "<=" : "<",
-                                                          bounds[1],
-                                                          onToken ? "*" : "");
-        }
+        public void setBound(Relation.Type type, Term t) throws InvalidRequestException;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/43496384/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
index 337e8dc..94df854 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
@@ -73,7 +73,7 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL
         return new ResultMessage.SchemaChange(changeType(), keyspace(), tableName);
     }
 
-    public ResultMessage executeInternal(QueryState state)
+    public ResultMessage executeInternal(QueryState state, QueryOptions options)
     {
         // executeInternal is for local query only, thus altering schema is not supported
         throw new UnsupportedOperationException();