You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2014/12/02 20:09:09 UTC

[5/5] cassandra git commit: Refactor SelectStatement and Restrictions

Refactor SelectStatement and Restrictions

Patch by Benjamin Lerer; reviewed by Tyler Hobbs for CASSANDRA-7981


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/65a7088e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/65a7088e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/65a7088e

Branch: refs/heads/trunk
Commit: 65a7088e71061b876e9cd51140f31c92ded92777
Parents: a604b14
Author: blerer <b_...@hotmail.com>
Authored: Tue Dec 2 13:08:25 2014 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Tue Dec 2 13:08:25 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |    2 +
 NEWS.txt                                        |    3 +
 .../cassandra/config/ColumnDefinition.java      |   40 +
 .../cassandra/cql3/ColumnSpecification.java     |    6 +
 src/java/org/apache/cassandra/cql3/Cql.g        |    5 +-
 .../cassandra/cql3/MultiColumnRelation.java     |  130 +-
 .../org/apache/cassandra/cql3/Operator.java     |  119 +-
 .../org/apache/cassandra/cql3/Relation.java     |  221 ++-
 .../cassandra/cql3/SingleColumnRelation.java    |  181 +-
 .../apache/cassandra/cql3/TokenRelation.java    |  164 ++
 src/java/org/apache/cassandra/cql3/Tuples.java  |    3 +-
 .../cassandra/cql3/VariableSpecifications.java  |   10 +
 .../AbstractPrimaryKeyRestrictions.java         |   36 +
 .../cql3/restrictions/AbstractRestriction.java  |  129 ++
 .../ForwardingPrimaryKeyRestrictions.java       |  159 ++
 .../restrictions/MultiColumnRestriction.java    |  520 ++++++
 .../restrictions/PrimaryKeyRestrictions.java    |   40 +
 .../cql3/restrictions/Restriction.java          |   97 ++
 .../cql3/restrictions/Restrictions.java         |   82 +
 .../ReversedPrimaryKeyRestrictions.java         |   77 +
 .../SingleColumnPrimaryKeyRestrictions.java     |  307 ++++
 .../restrictions/SingleColumnRestriction.java   |  477 ++++++
 .../restrictions/SingleColumnRestrictions.java  |  209 +++
 .../restrictions/StatementRestrictions.java     |  600 +++++++
 .../cassandra/cql3/restrictions/TermSlice.java  |  167 ++
 .../cql3/restrictions/TokenRestriction.java     |  224 +++
 .../cassandra/cql3/selection/Selection.java     |  123 +-
 .../apache/cassandra/cql3/statements/Bound.java |   14 +-
 .../cql3/statements/DeleteStatement.java        |    1 +
 .../cql3/statements/ModificationStatement.java  |   35 +-
 .../cql3/statements/MultiColumnRestriction.java |  137 --
 .../cql3/statements/RequestValidations.java     |  194 +++
 .../cassandra/cql3/statements/Restriction.java  |   79 -
 .../cql3/statements/SelectStatement.java        | 1597 +++---------------
 .../statements/SingleColumnRestriction.java     |  486 ------
 .../cassandra/db/composites/Composites.java     |   22 +-
 .../db/composites/CompositesBuilder.java        |   15 +-
 .../cassandra/db/marshal/CollectionType.java    |   34 +-
 .../exceptions/UnrecognizedEntityException.java |   49 +
 .../org/apache/cassandra/cql3/AliasTest.java    |   40 +
 .../cassandra/cql3/ContainsRelationTest.java    |   39 +-
 .../cassandra/cql3/FrozenCollectionsTest.java   |   16 +-
 .../cassandra/cql3/MultiColumnRelationTest.java |  161 +-
 .../cql3/SelectWithTokenFunctionTest.java       |   39 +-
 .../cql3/SingleColumnRelationTest.java          |  218 ++-
 .../cassandra/cql3/ThriftCompatibilityTest.java |    2 +-
 46 files changed, 5031 insertions(+), 2278 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3cb1c0f..6761c31 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 3.0
+ * Refactor SelectStatement, return IN results in natural order instead
+   of IN value list order (CASSANDRA-7981)
  * Support UDTs, tuples, and collections in user-defined
    functions (CASSANDRA-7563)
  * Fix aggregate fn results on empty selection, result column name,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 1d168f0..8d8ebdc 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -33,6 +33,9 @@ Upgrading
      in 2.0.0). Please switch to CQL3 if you haven't already done so.
    - Very large batches will now be rejected (defaults to 50kb). This
      can be customized by modifying batch_size_fail_threshold_in_kb.
+   - The results of CQL3 queries containing an IN restriction will be ordered
+     in the normal order and not anymore in the order in which the column values were
+     specified in the IN restriction.
 
 2.1.2
 =====

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index 10a5a8b..354a6f1 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -21,7 +21,9 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
 import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
 
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.db.*;
@@ -171,11 +173,31 @@ public class ColumnDefinition extends ColumnSpecification
         return componentIndex == null;
     }
 
+    public boolean isPartitionKey()
+    {
+        return kind == Kind.PARTITION_KEY;
+    }
+
+    public boolean isClusteringColumn()
+    {
+        return kind == Kind.CLUSTERING_COLUMN;
+    }
+
     public boolean isStatic()
     {
         return kind == Kind.STATIC;
     }
 
+    public boolean isRegular()
+    {
+        return kind == Kind.REGULAR;
+    }
+
+    public boolean isCompactValue()
+    {
+        return kind == Kind.COMPACT_VALUE;
+    }
+
     // The componentIndex. This never return null however for convenience sake:
     // if componentIndex == null, this return 0. So caller should first check
     // isOnAllComponents() to distinguish if that's a possibility.
@@ -425,4 +447,22 @@ public class ColumnDefinition extends ColumnSpecification
     {
         return indexOptions.containsKey(name);
     }
+
+    /**
+     * Converts the specified column definitions into column identifiers.
+     *
+     * @param definitions the column definitions to convert.
+     * @return the column identifiers corresponding to the specified definitions
+     */
+    public static List<ColumnIdentifier> toIdentifiers(List<ColumnDefinition> definitions)
+    {
+        return Lists.transform(definitions, new Function<ColumnDefinition, ColumnIdentifier>()
+        {
+            @Override
+            public ColumnIdentifier apply(ColumnDefinition columnDef)
+            {
+                return columnDef.name;
+            }
+        });
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/cql3/ColumnSpecification.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ColumnSpecification.java b/src/java/org/apache/cassandra/cql3/ColumnSpecification.java
index 2584f85..cc54375 100644
--- a/src/java/org/apache/cassandra/cql3/ColumnSpecification.java
+++ b/src/java/org/apache/cassandra/cql3/ColumnSpecification.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.cql3;
 
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ReversedType;
 
 public class ColumnSpecification
 {
@@ -44,4 +45,9 @@ public class ColumnSpecification
     {
         return new ColumnSpecification(ksName, cfName, alias, type);
     }
+    
+    public boolean isReversedType()
+    {
+        return type instanceof ReversedType;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index 4c051e3..77156f2 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -1091,10 +1091,7 @@ relationType returns [Operator op]
 relation[List<Relation> clauses]
     : name=cident type=relationType t=term { $clauses.add(new SingleColumnRelation(name, type, t)); }
     | K_TOKEN l=tupleOfIdentifiers type=relationType t=term
-        {
-            for (ColumnIdentifier.Raw id : l)
-                $clauses.add(new SingleColumnRelation(id, type, t, true));
-        }
+        { $clauses.add(new TokenRelation(l, type, t)); }
     | name=cident K_IN marker=inMarker
         { $clauses.add(new SingleColumnRelation(name, Operator.IN, marker)); }
     | name=cident K_IN inValues=singleColumnInValues

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java b/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java
index 37eb69e..d754968 100644
--- a/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java
+++ b/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java
@@ -17,8 +17,22 @@
  */
 package org.apache.cassandra.cql3;
 
+import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.Term.MultiColumnRaw;
+import org.apache.cassandra.cql3.Term.Raw;
+import org.apache.cassandra.cql3.restrictions.MultiColumnRestriction;
+import org.apache.cassandra.cql3.restrictions.Restriction;
+import org.apache.cassandra.cql3.statements.Bound;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
+import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
+
 /**
  * A relation using the tuple notation, which typically affects multiple columns.
  * Examples:
@@ -55,6 +69,7 @@ public class MultiColumnRelation extends Relation
      * @param entities the columns on the LHS of the relation
      * @param relationType the relation operator
      * @param valuesOrMarker a Tuples.Literal instance or a Tuples.Raw marker
+     * @return a new <code>MultiColumnRelation</code> instance
      */
     public static MultiColumnRelation createNonInRelation(List<ColumnIdentifier.Raw> entities, Operator relationType, Term.MultiColumnRaw valuesOrMarker)
     {
@@ -67,6 +82,7 @@ public class MultiColumnRelation extends Relation
      * For example: "SELECT ... WHERE (a, b) IN ((0, 1), (2, 3))"
      * @param entities the columns on the LHS of the relation
      * @param inValues a list of Tuples.Literal instances or a Tuples.Raw markers
+     * @return a new <code>MultiColumnRelation</code> instance
      */
     public static MultiColumnRelation createInRelation(List<ColumnIdentifier.Raw> entities, List<? extends Term.MultiColumnRaw> inValues)
     {
@@ -78,6 +94,7 @@ public class MultiColumnRelation extends Relation
      * For example: "SELECT ... WHERE (a, b) IN ?"
      * @param entities the columns on the LHS of the relation
      * @param inMarker a single IN marker
+     * @return a new <code>MultiColumnRelation</code> instance
      */
     public static MultiColumnRelation createSingleMarkerInRelation(List<ColumnIdentifier.Raw> entities, Tuples.INRaw inMarker)
     {
@@ -91,54 +108,109 @@ public class MultiColumnRelation extends Relation
 
     /**
      * For non-IN relations, returns the Tuples.Literal or Tuples.Raw marker for a single tuple.
+     * @return a Tuples.Literal for non-IN relations or Tuples.Raw marker for a single tuple.
      */
-    public Term.MultiColumnRaw getValue()
+    private Term.MultiColumnRaw getValue()
     {
-        assert relationType != Operator.IN;
-        return valuesOrMarker;
+        return relationType == Operator.IN ? inMarker : valuesOrMarker;
     }
 
-    /**
-     * For IN relations, returns the list of Tuples.Literal instances or Tuples.Raw markers.
-     * If a single IN marker was used, this will return null;
-     */
-    public List<? extends Term.MultiColumnRaw> getInValues()
+    @Override
+    public boolean isMultiColumn()
     {
+        return true;
+    }
 
-        return inValues;
+    @Override
+    protected Restriction newEQRestriction(CFMetaData cfm,
+                                           VariableSpecifications boundNames) throws InvalidRequestException
+    {
+        List<ColumnDefinition> receivers = receivers(cfm);
+        Term term = toTerm(receivers, getValue(), cfm.ksName, boundNames);
+        return new MultiColumnRestriction.EQ(cfm.comparator, receivers, term);
     }
 
-    /**
-     * For IN relations, returns the single marker for the IN values if there is one, otherwise null.
-     */
-    public Tuples.INRaw getInMarker()
+    @Override
+    protected Restriction newINRestriction(CFMetaData cfm,
+                                           VariableSpecifications boundNames) throws InvalidRequestException
+    {
+        List<ColumnDefinition> receivers = receivers(cfm);
+        List<Term> terms = toTerms(receivers, inValues, cfm.ksName, boundNames);
+        if (terms == null)
+        {
+            Term term = toTerm(receivers, getValue(), cfm.ksName, boundNames);
+            return new MultiColumnRestriction.InWithMarker(cfm.comparator, receivers, (AbstractMarker) term);
+        }
+        return new MultiColumnRestriction.InWithValues(cfm.comparator, receivers, terms);
+    }
+
+    @Override
+    protected Restriction newSliceRestriction(CFMetaData cfm,
+                                              VariableSpecifications boundNames,
+                                              Bound bound,
+                                              boolean inclusive) throws InvalidRequestException
     {
-        return inMarker;
+        List<ColumnDefinition> receivers = receivers(cfm);
+        Term term = toTerm(receivers(cfm), getValue(), cfm.ksName, boundNames);
+        return new MultiColumnRestriction.Slice(cfm.comparator, receivers, bound, inclusive, term);
     }
 
-    public boolean isMultiColumn()
+    @Override
+    protected Restriction newContainsRestriction(CFMetaData cfm,
+                                                 VariableSpecifications boundNames,
+                                                 boolean isKey) throws InvalidRequestException
     {
-        return true;
+        throw invalidRequest("%s cannot be used for Multi-column relations", operator());
     }
 
     @Override
-    public String toString()
+    protected Term toTerm(List<? extends ColumnSpecification> receivers,
+                          Raw raw,
+                          String keyspace,
+                          VariableSpecifications boundNames) throws InvalidRequestException
     {
-        if (relationType == Operator.IN)
+        Term term = ((MultiColumnRaw) raw).prepare(keyspace, receivers);
+        term.collectMarkerSpecification(boundNames);
+        return term;
+    }
+
+    protected List<ColumnDefinition> receivers(CFMetaData cfm) throws InvalidRequestException
+    {
+        List<ColumnDefinition> names = new ArrayList<>(getEntities().size());
+        int previousPosition = -1;
+        for (ColumnIdentifier.Raw raw : getEntities())
         {
-            StringBuilder sb = new StringBuilder(Tuples.tupleToString(entities));
-            sb.append(" IN ");
-            sb.append(inMarker != null ? '?' : Tuples.tupleToString(inValues));
-            return sb.toString();
+            ColumnDefinition def = toColumnDefinition(cfm, raw);
+            checkTrue(def.isClusteringColumn(), "Multi-column relations can only be applied to clustering columns but was applied to: %s", def.name);
+            checkFalse(names.contains(def), "Column \"%s\" appeared twice in a relation: %s", def.name, this);
+
+            // check that no clustering columns were skipped
+            if (def.position() != previousPosition + 1)
+            {
+                checkFalse(previousPosition == -1, "Clustering columns may not be skipped in multi-column relations. " +
+                                                   "They should appear in the PRIMARY KEY order. Got %s", this);
+                throw invalidRequest("Clustering columns must appear in the PRIMARY KEY order in multi-column relations: %s", this);
+            }
+            names.add(def);
+            previousPosition = def.position();
         }
-        else
+        return names;
+    }
+
+    @Override
+    public String toString()
+    {
+        StringBuilder builder = new StringBuilder(Tuples.tupleToString(entities));
+        if (isIN())
         {
-            StringBuilder sb = new StringBuilder(Tuples.tupleToString(entities));
-            sb.append(" ");
-            sb.append(relationType);
-            sb.append(" ");
-            sb.append(valuesOrMarker);
-            return sb.toString();
+            return builder.append(" IN ")
+                          .append(inMarker != null ? '?' : Tuples.tupleToString(inValues))
+                          .toString();
         }
+        return builder.append(" ")
+                      .append(relationType)
+                      .append(" ")
+                      .append(valuesOrMarker)
+                      .toString();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/cql3/Operator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Operator.java b/src/java/org/apache/cassandra/cql3/Operator.java
index 359fcb8..86bcbd3 100644
--- a/src/java/org/apache/cassandra/cql3/Operator.java
+++ b/src/java/org/apache/cassandra/cql3/Operator.java
@@ -23,7 +23,92 @@ import java.io.IOException;
 
 public enum Operator
 {
-    EQ(0), LT(4), LTE(3), GTE(1), GT(2), IN(7), CONTAINS(5), CONTAINS_KEY(6), NEQ(8);
+    EQ(0)
+    {
+        @Override
+        public String toString()
+        {
+            return "=";
+        }
+    },
+    LT(4)
+    {
+        @Override
+        public String toString()
+        {
+            return "<";
+        }
+
+        @Override
+        public Operator reverse()
+        {
+            return GT;
+        }
+    },
+    LTE(3)
+    {
+        @Override
+        public String toString()
+        {
+            return "<=";
+        }
+
+        @Override
+        public Operator reverse()
+        {
+            return GTE;
+        }
+    },
+    GTE(1)
+    {
+        @Override
+        public String toString()
+        {
+            return ">=";
+        }
+
+        @Override
+        public Operator reverse()
+        {
+            return LTE;
+        }
+    },
+    GT(2)
+    {
+        @Override
+        public String toString()
+        {
+            return ">";
+        }
+
+        @Override
+        public Operator reverse()
+        {
+            return LT;
+        }
+    },
+    IN(7)
+    {
+    },
+    CONTAINS(5)
+    {
+    },
+    CONTAINS_KEY(6)
+    {
+        @Override
+        public String toString()
+        {
+            return "CONTAINS KEY";
+        }
+    },
+    NEQ(8)
+    {
+        @Override
+        public String toString()
+        {
+            return "!=";
+        }
+    };
 
     /**
      * The binary representation of this <code>Enum</code> value.
@@ -70,24 +155,16 @@ public enum Operator
     @Override
     public String toString()
     {
-        switch (this)
-        {
-            case EQ:
-                return "=";
-            case LT:
-                return "<";
-            case LTE:
-                return "<=";
-            case GT:
-                return ">";
-            case GTE:
-                return ">=";
-            case NEQ:
-                return "!=";
-            case CONTAINS_KEY:
-                return "CONTAINS KEY";
-            default:
-                return this.name();
-        }
+         return this.name();
+    }
+
+    /**
+     * Returns the reverse operator if this one.
+     *
+     * @return the reverse operator of this one.
+     */
+    public Operator reverse()
+    {
+        return this;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/cql3/Relation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Relation.java b/src/java/org/apache/cassandra/cql3/Relation.java
index 91d4100..1337096 100644
--- a/src/java/org/apache/cassandra/cql3/Relation.java
+++ b/src/java/org/apache/cassandra/cql3/Relation.java
@@ -17,6 +17,18 @@
  */
 package org.apache.cassandra.cql3;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.restrictions.Restriction;
+import org.apache.cassandra.cql3.statements.Bound;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.UnrecognizedEntityException;
+
+import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
+
 public abstract class Relation {
 
     protected Operator relationType;
@@ -26,5 +38,212 @@ public abstract class Relation {
         return relationType;
     }
 
-    public abstract boolean isMultiColumn();
+    /**
+     * Checks if this relation apply to multiple columns.
+     *
+     * @return <code>true</code> if this relation apply to multiple columns, <code>false</code> otherwise.
+     */
+    public boolean isMultiColumn()
+    {
+        return false;
+    }
+
+    /**
+     * Checks if this relation is a token relation (e.g. <pre>token(a) = token(1)</pre>).
+     *
+     * @return <code>true</code> if this relation is a token relation, <code>false</code> otherwise.
+     */
+    public boolean onToken()
+    {
+        return false;
+    }
+
+    /**
+     * Checks if the operator of this relation is a <code>CONTAINS</code>.
+     * @return <code>true</code>  if the operator of this relation is a <code>CONTAINS</code>, <code>false</code>
+     * otherwise.
+     */
+    public final boolean isContains()
+    {
+        return relationType == Operator.CONTAINS;
+    }
+
+    /**
+     * Checks if the operator of this relation is a <code>CONTAINS_KEY</code>.
+     * @return <code>true</code>  if the operator of this relation is a <code>CONTAINS_KEY</code>, <code>false</code>
+     * otherwise.
+     */
+    public final boolean isContainsKey()
+    {
+        return relationType == Operator.CONTAINS_KEY;
+    }
+
+    /**
+     * Checks if the operator of this relation is a <code>IN</code>.
+     * @return <code>true</code>  if the operator of this relation is a <code>IN</code>, <code>false</code>
+     * otherwise.
+     */
+    public final boolean isIN()
+    {
+        return relationType == Operator.IN;
+    }
+
+    /**
+     * Checks if the operator of this relation is a <code>EQ</code>.
+     * @return <code>true</code>  if the operator of this relation is a <code>EQ</code>, <code>false</code>
+     * otherwise.
+     */
+    public final boolean isEQ()
+    {
+        return relationType == Operator.EQ;
+    }
+
+    /**
+     * Checks if the operator of this relation is a <code>Slice</code> (GT, GTE, LTE, LT).
+     *
+     * @return <code>true</code> if the operator of this relation is a <code>Slice</code>, <code>false</code> otherwise.
+     */
+    public final boolean isSlice()
+    {
+        return relationType == Operator.GT
+                || relationType == Operator.GTE
+                || relationType == Operator.LTE
+                || relationType == Operator.LT;
+    }
+
+    /**
+     * Converts this <code>Relation</code> into a <code>Restriction</code>.
+     *
+     * @param cfm the Column Family meta data
+     * @param boundNames the variables specification where to collect the bind variables
+     * @return the <code>Restriction</code> corresponding to this <code>Relation</code>
+     * @throws InvalidRequestException if this <code>Relation</code> is not valid
+     */
+    public final Restriction toRestriction(CFMetaData cfm,
+                                           VariableSpecifications boundNames) throws InvalidRequestException
+    {
+        switch (relationType)
+        {
+            case EQ: return newEQRestriction(cfm, boundNames);
+            case LT: return newSliceRestriction(cfm, boundNames, Bound.END, false);
+            case LTE: return newSliceRestriction(cfm, boundNames, Bound.END, true);
+            case GTE: return newSliceRestriction(cfm, boundNames, Bound.START, true);
+            case GT: return newSliceRestriction(cfm, boundNames, Bound.START, false);
+            case IN: return newINRestriction(cfm, boundNames);
+            case CONTAINS: return newContainsRestriction(cfm, boundNames, false);
+            case CONTAINS_KEY: return newContainsRestriction(cfm, boundNames, true);
+            default: throw invalidRequest("Unsupported \"!=\" relation: %s", this);
+        }
+    }
+
+    /**
+     * Creates a new EQ restriction instance.
+     *
+     * @param cfm the Column Family meta data
+     * @param boundNames the variables specification where to collect the bind variables
+     * @return a new EQ restriction instance.
+     * @throws InvalidRequestException if the relation cannot be converted into an EQ restriction.
+     */
+    protected abstract Restriction newEQRestriction(CFMetaData cfm,
+                                                    VariableSpecifications boundNames) throws InvalidRequestException;
+
+    /**
+     * Creates a new IN restriction instance.
+     *
+     * @param cfm the Column Family meta data
+     * @param boundNames the variables specification where to collect the bind variables
+     * @return a new IN restriction instance
+     * @throws InvalidRequestException if the relation cannot be converted into an IN restriction.
+     */
+    protected abstract Restriction newINRestriction(CFMetaData cfm,
+                                                    VariableSpecifications boundNames) throws InvalidRequestException;
+
+    /**
+     * Creates a new Slice restriction instance.
+     *
+     * @param cfm the Column Family meta data
+     * @param boundNames the variables specification where to collect the bind variables
+     * @param bound the slice bound
+     * @param inclusive <code>true</code> if the bound is included.
+     * @return a new slice restriction instance
+     * @throws InvalidRequestException if the <code>Relation</code> is not valid
+     */
+    protected abstract Restriction newSliceRestriction(CFMetaData cfm,
+                                                       VariableSpecifications boundNames,
+                                                       Bound bound,
+                                                       boolean inclusive) throws InvalidRequestException;
+
+    /**
+     * Creates a new Contains restriction instance.
+     *
+     * @param cfm the Column Family meta data
+     * @param boundNames the variables specification where to collect the bind variables
+     * @param isKey <code>true</code> if the restriction to create is a CONTAINS KEY
+     * @return a new Contains <code>Restriction</code> instance
+     * @throws InvalidRequestException if the <code>Relation</code> is not valid
+     */
+    protected abstract Restriction newContainsRestriction(CFMetaData cfm,
+                                                          VariableSpecifications boundNames,
+                                                          boolean isKey) throws InvalidRequestException;
+
+    /**
+     * Converts the specified <code>Raw</code> into a <code>Term</code>.
+     * @param receivers the columns to which the values must be associated at
+     * @param raw the raw term to convert
+     * @param keyspace the keyspace name
+     * @param boundNames the variables specification where to collect the bind variables
+     *
+     * @return the <code>Term</code> corresponding to the specified <code>Raw</code>
+     * @throws InvalidRequestException if the <code>Raw</code> term is not valid
+     */
+    protected abstract Term toTerm(List<? extends ColumnSpecification> receivers,
+                                   Term.Raw raw,
+                                   String keyspace,
+                                   VariableSpecifications boundNames)
+                                   throws InvalidRequestException;
+
+    /**
+     * Converts the specified <code>Raw</code> terms into a <code>Term</code>s.
+     * @param receivers the columns to which the values must be associated at
+     * @param raws the raw terms to convert
+     * @param keyspace the keyspace name
+     * @param boundNames the variables specification where to collect the bind variables
+     *
+     * @return the <code>Term</code>s corresponding to the specified <code>Raw</code> terms
+     * @throws InvalidRequestException if the <code>Raw</code> terms are not valid
+     */
+    protected final List<Term> toTerms(List<? extends ColumnSpecification> receivers,
+                                       List<? extends Term.Raw> raws,
+                                       String keyspace,
+                                       VariableSpecifications boundNames) throws InvalidRequestException
+    {
+        if (raws == null)
+            return null;
+
+        List<Term> terms = new ArrayList<>();
+        for (int i = 0, m = raws.size(); i < m; i++)
+            terms.add(toTerm(receivers, raws.get(i), keyspace, boundNames));
+
+        return terms;
+    }
+
+    /**
+     * Converts the specified entity into a column definition.
+     *
+     * @param cfm the column family meta data
+     * @param entity the entity to convert
+     * @return the column definition corresponding to the specified entity
+     * @throws InvalidRequestException if the entity cannot be recognized
+     */
+    protected final ColumnDefinition toColumnDefinition(CFMetaData cfm,
+                                                        ColumnIdentifier.Raw entity) throws InvalidRequestException
+    {
+        ColumnIdentifier identifier = entity.prepare(cfm);
+        ColumnDefinition def = cfm.getColumnDefinition(identifier);
+
+        if (def == null)
+            throw new UnrecognizedEntityException(identifier, this);
+
+        return def;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java b/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java
index d5109f5..7817d43 100644
--- a/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java
+++ b/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java
@@ -17,27 +17,38 @@
  */
 package org.apache.cassandra.cql3;
 
+import java.util.Collections;
 import java.util.List;
 
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.Term.Raw;
+import org.apache.cassandra.cql3.restrictions.Restriction;
+import org.apache.cassandra.cql3.restrictions.SingleColumnRestriction;
+import org.apache.cassandra.cql3.statements.Bound;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+
 /**
  * Relations encapsulate the relationship between an entity of some kind, and
  * a value (term). For example, <key> > "start" or "colname1" = "somevalue".
  *
  */
-public class SingleColumnRelation extends Relation
+public final class SingleColumnRelation extends Relation
 {
     private final ColumnIdentifier.Raw entity;
     private final Term.Raw value;
     private final List<Term.Raw> inValues;
-    public final boolean onToken;
 
-    private SingleColumnRelation(ColumnIdentifier.Raw entity, Operator type, Term.Raw value, List<Term.Raw> inValues, boolean onToken)
+    private SingleColumnRelation(ColumnIdentifier.Raw entity, Operator type, Term.Raw value, List<Term.Raw> inValues)
     {
         this.entity = entity;
         this.relationType = type;
         this.value = value;
         this.inValues = inValues;
-        this.onToken = onToken;
     }
 
     /**
@@ -49,17 +60,12 @@ public class SingleColumnRelation extends Relation
      */
     public SingleColumnRelation(ColumnIdentifier.Raw entity, Operator type, Term.Raw value)
     {
-        this(entity, type, value, null, false);
-    }
-
-    public SingleColumnRelation(ColumnIdentifier.Raw entity, Operator type, Term.Raw value, boolean onToken)
-    {
-        this(entity, type, value, null, onToken);
+        this(entity, type, value, null);
     }
 
     public static SingleColumnRelation createInRelation(ColumnIdentifier.Raw entity, List<Term.Raw> inValues)
     {
-        return new SingleColumnRelation(entity, Operator.IN, null, inValues, false);
+        return new SingleColumnRelation(entity, Operator.IN, null, inValues);
     }
 
     public ColumnIdentifier.Raw getEntity()
@@ -67,21 +73,18 @@ public class SingleColumnRelation extends Relation
         return entity;
     }
 
-    public Term.Raw getValue()
-    {
-        assert relationType != Operator.IN || value == null || value instanceof AbstractMarker.INRaw;
-        return value;
-    }
-
-    public List<Term.Raw> getInValues()
+    @Override
+    protected Term toTerm(List<? extends ColumnSpecification> receivers,
+                          Raw raw,
+                          String keyspace,
+                          VariableSpecifications boundNames)
+                          throws InvalidRequestException
     {
-        assert relationType == Operator.IN;
-        return inValues;
-    }
+        assert receivers.size() == 1;
 
-    public boolean isMultiColumn()
-    {
-        return false;
+        Term term = raw.prepare(keyspace, receivers.get(0));
+        term.collectMarkerSpecification(boundNames);
+        return term;
     }
 
     public SingleColumnRelation withNonStrictOperator()
@@ -97,11 +100,131 @@ public class SingleColumnRelation extends Relation
     @Override
     public String toString()
     {
-        if (relationType == Operator.IN)
+        if (isIN())
             return String.format("%s IN %s", entity, inValues);
-        else if (onToken)
-            return String.format("token(%s) %s %s", entity, relationType, value);
-        else
-            return String.format("%s %s %s", entity, relationType, value);
+
+        return String.format("%s %s %s", entity, relationType, value);
+    }
+
+    @Override
+    protected Restriction newEQRestriction(CFMetaData cfm,
+                                           VariableSpecifications boundNames) throws InvalidRequestException
+    {
+        ColumnDefinition columnDef = toColumnDefinition(cfm, entity);
+        Term term = toTerm(toReceivers(cfm, columnDef), value, cfm.ksName, boundNames);
+        return new SingleColumnRestriction.EQ(columnDef, term);
+    }
+
+    @Override
+    protected Restriction newINRestriction(CFMetaData cfm,
+                                           VariableSpecifications boundNames) throws InvalidRequestException
+    {
+        ColumnDefinition columnDef = cfm.getColumnDefinition(getEntity().prepare(cfm));
+        List<? extends ColumnSpecification> receivers = toReceivers(cfm, columnDef);
+        List<Term> terms = toTerms(receivers, inValues, cfm.ksName, boundNames);
+        if (terms == null)
+        {
+            Term term = toTerm(receivers, value, cfm.ksName, boundNames);
+            return new SingleColumnRestriction.InWithMarker(columnDef, (Lists.Marker) term);
+        }
+        return new SingleColumnRestriction.InWithValues(columnDef, terms);
+    }
+
+    @Override
+    protected Restriction newSliceRestriction(CFMetaData cfm,
+                                              VariableSpecifications boundNames,
+                                              Bound bound,
+                                              boolean inclusive) throws InvalidRequestException
+    {
+        ColumnDefinition columnDef = toColumnDefinition(cfm, entity);
+        Term term = toTerm(toReceivers(cfm, columnDef), value, cfm.ksName, boundNames);
+        return new SingleColumnRestriction.Slice(columnDef, bound, inclusive, term);
+    }
+
+    @Override
+    protected Restriction newContainsRestriction(CFMetaData cfm,
+                                                 VariableSpecifications boundNames,
+                                                 boolean isKey) throws InvalidRequestException
+    {
+        ColumnDefinition columnDef = toColumnDefinition(cfm, entity);
+        Term term = toTerm(toReceivers(cfm, columnDef), value, cfm.ksName, boundNames);
+        return new SingleColumnRestriction.Contains(columnDef, term, isKey);
+    }
+
+    /**
+     * Returns the receivers for this relation.
+     *
+     * @param cfm the Column Family meta data
+     * @param columnDef the column definition
+     * @return the receivers for the specified relation.
+     * @throws InvalidRequestException if the relation is invalid
+     */
+    private List<? extends ColumnSpecification> toReceivers(CFMetaData cfm, ColumnDefinition columnDef) throws InvalidRequestException
+    {
+        ColumnSpecification receiver = columnDef;
+
+        checkFalse(columnDef.isCompactValue(),
+                   "Predicates on the non-primary-key column (%s) of a COMPACT table are not yet supported",
+                   columnDef.name);
+
+        if (isIN())
+        {
+            // For partition keys we only support IN for the last name so far
+            checkFalse(columnDef.isPartitionKey() && !isLastPartitionKey(cfm, columnDef),
+                      "Partition KEY part %s cannot be restricted by IN relation (only the last part of the partition key can)",
+                      columnDef.name);
+
+            // We only allow IN on the row key and the clustering key so far, never on non-PK columns, and this even if
+            // there's an index
+            // Note: for backward compatibility reason, we conside a IN of 1 value the same as a EQ, so we let that
+            // slide.
+            checkFalse(!columnDef.isPrimaryKeyColumn() && !canHaveOnlyOneValue(),
+                       "IN predicates on non-primary-key columns (%s) is not yet supported", columnDef.name);
+        }
+        else if (isSlice())
+        {
+            // Non EQ relation is not supported without token(), even if we have a 2ndary index (since even those
+            // are ordered by partitioner).
+            // Note: In theory we could allow it for 2ndary index queries with ALLOW FILTERING, but that would
+            // probably require some special casing
+            // Note bis: This is also why we don't bother handling the 'tuple' notation of #4851 for keys. If we
+            // lift the limitation for 2ndary
+            // index with filtering, we'll need to handle it though.
+            checkFalse(columnDef.isPartitionKey(), "Only EQ and IN relation are supported on the partition key (unless you use the token() function)");
+        }
+
+        checkFalse(isContainsKey() && !(receiver.type instanceof MapType), "Cannot use CONTAINS KEY on non-map column %s", receiver.name);
+
+        if (receiver.type.isCollection())
+        {
+            // We don't support relations against entire collections (unless they're frozen), like "numbers = {1, 2, 3}"
+            checkFalse(receiver.type.isMultiCell() && !(isContainsKey() || isContains()),
+                       "Collection column '%s' (%s) cannot be restricted by a '%s' relation",
+                       receiver.name,
+                       receiver.type.asCQL3Type(),
+                       operator());
+
+            if (isContainsKey() || isContains())
+                receiver = ((CollectionType<?>) receiver.type).makeCollectionReceiver(receiver, isContainsKey());
+        }
+        return Collections.singletonList(receiver);
+    }
+
+    /**
+     * Checks if the specified column is the last column of the partition key.
+     *
+     * @param cfm the column family meta data
+     * @param columnDef the column to check
+     * @return <code>true</code> if the specified column is the last column of the partition key, <code>false</code>
+     * otherwise.
+     */
+    private static boolean isLastPartitionKey(CFMetaData cfm, ColumnDefinition columnDef)
+    {
+        return columnDef.position() == cfm.partitionKeyColumns().size() - 1;
+    }
+
+    private boolean canHaveOnlyOneValue()
+    {
+        return isEQ() || (isIN() && inValues != null && inValues.size() == 1);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/cql3/TokenRelation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/TokenRelation.java b/src/java/org/apache/cassandra/cql3/TokenRelation.java
new file mode 100644
index 0000000..d1bd265
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/TokenRelation.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.base.Joiner;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.Term.Raw;
+import org.apache.cassandra.cql3.restrictions.Restriction;
+import org.apache.cassandra.cql3.restrictions.TokenRestriction;
+import org.apache.cassandra.cql3.statements.Bound;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.service.StorageService;
+
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkContainsNoDuplicates;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkContainsOnly;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
+import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
+
+/**
+ * A relation using the token function.
+ * Examples:
+ * <ul>
+ * <li>SELECT ... WHERE token(a) &gt; token(1)</li>
+ * <li>SELECT ... WHERE token(a, b) &gt; token(1, 3)</li>
+ * </ul>
+ */
+public final class TokenRelation extends Relation
+{
+    private final List<ColumnIdentifier.Raw> entities;
+
+    private final Term.Raw value;
+
+    public TokenRelation(List<ColumnIdentifier.Raw> entities, Operator type, Term.Raw value)
+    {
+        this.entities = entities;
+        this.relationType = type;
+        this.value = value;
+    }
+
+    @Override
+    public boolean onToken()
+    {
+        return true;
+    }
+
+    @Override
+    protected Restriction newEQRestriction(CFMetaData cfm, VariableSpecifications boundNames) throws InvalidRequestException
+    {
+        List<ColumnDefinition> columnDefs = getColumnDefinitions(cfm);
+        Term term = toTerm(toReceivers(cfm, columnDefs), value, cfm.ksName, boundNames);
+        return new TokenRestriction.EQ(columnDefs, term);
+    }
+
+    @Override
+    protected Restriction newINRestriction(CFMetaData cfm, VariableSpecifications boundNames) throws InvalidRequestException
+    {
+        throw invalidRequest("%s cannot be used with the token function", operator());
+    }
+
+    @Override
+    protected Restriction newSliceRestriction(CFMetaData cfm,
+                                              VariableSpecifications boundNames,
+                                              Bound bound,
+                                              boolean inclusive) throws InvalidRequestException
+    {
+        List<ColumnDefinition> columnDefs = getColumnDefinitions(cfm);
+        Term term = toTerm(toReceivers(cfm, columnDefs), value, cfm.ksName, boundNames);
+        return new TokenRestriction.Slice(columnDefs, bound, inclusive, term);
+    }
+
+    @Override
+    protected Restriction newContainsRestriction(CFMetaData cfm, VariableSpecifications boundNames, boolean isKey) throws InvalidRequestException
+    {
+        throw invalidRequest("%s cannot be used with the token function", operator());
+    }
+
+    @Override
+    protected Term toTerm(List<? extends ColumnSpecification> receivers,
+                          Raw raw,
+                          String keyspace,
+                          VariableSpecifications boundNames) throws InvalidRequestException
+    {
+        Term term = raw.prepare(keyspace, receivers.get(0));
+        term.collectMarkerSpecification(boundNames);
+        return term;
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("token(%s) %s %s", Tuples.tupleToString(entities), relationType, value);
+    }
+
+    /**
+     * Returns the definition of the columns to which apply the token restriction.
+     *
+     * @param cfm the column family metadata
+     * @return the definition of the columns to which apply the token restriction.
+     * @throws InvalidRequestException if the entity cannot be resolved
+     */
+    private List<ColumnDefinition> getColumnDefinitions(CFMetaData cfm) throws InvalidRequestException
+    {
+        List<ColumnDefinition> columnDefs = new ArrayList<>();
+        for ( ColumnIdentifier.Raw raw : entities)
+        {
+            columnDefs.add(toColumnDefinition(cfm, raw));
+        }
+        return columnDefs;
+    }
+
+    /**
+     * Returns the receivers for this relation.
+     *
+     * @param cfm the Column Family meta data
+     * @param columnDefs the column definitions
+     * @return the receivers for the specified relation.
+     * @throws InvalidRequestException if the relation is invalid
+     */
+    private static List<? extends ColumnSpecification> toReceivers(CFMetaData cfm,
+                                                                   List<ColumnDefinition> columnDefs)
+                                                                   throws InvalidRequestException
+    {
+
+        if (!columnDefs.equals(cfm.partitionKeyColumns()))
+        {
+            checkTrue(columnDefs.containsAll(cfm.partitionKeyColumns()),
+                      "The token() function must be applied to all partition key components or none of them");
+
+            checkContainsNoDuplicates(columnDefs, "The token() function contains duplicate partition key components");
+
+            checkContainsOnly(columnDefs, cfm.partitionKeyColumns(), "The token() function must contains only partition key components");
+
+            throw invalidRequest("The token function arguments must be in the partition key order: %s",
+                                 Joiner.on(", ").join(ColumnDefinition.toIdentifiers(cfm.partitionKeyColumns())));
+        }
+
+        ColumnDefinition firstColumn = columnDefs.get(0);
+        return Collections.singletonList(new ColumnSpecification(firstColumn.ksName,
+                                                                 firstColumn.cfName,
+                                                                 new ColumnIdentifier("partition key token", true),
+                                                                 StorageService.getPartitioner().getTokenValidator()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/cql3/Tuples.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Tuples.java b/src/java/org/apache/cassandra/cql3/Tuples.java
index b203546..a66c534 100644
--- a/src/java/org/apache/cassandra/cql3/Tuples.java
+++ b/src/java/org/apache/cassandra/cql3/Tuples.java
@@ -23,6 +23,7 @@ import java.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.cql3.Term.MultiColumnRaw;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.serializers.MarshalException;
@@ -319,7 +320,7 @@ public class Tuples
     /**
      * A raw marker for an IN list of tuples, like "SELECT ... WHERE (a, b, c) IN ?"
      */
-    public static class INRaw extends AbstractMarker.Raw
+    public static class INRaw extends AbstractMarker.Raw implements MultiColumnRaw
     {
         public INRaw(int bindIndex)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/cql3/VariableSpecifications.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/VariableSpecifications.java b/src/java/org/apache/cassandra/cql3/VariableSpecifications.java
index ef78619..0a55ced 100644
--- a/src/java/org/apache/cassandra/cql3/VariableSpecifications.java
+++ b/src/java/org/apache/cassandra/cql3/VariableSpecifications.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.cql3;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 public class VariableSpecifications
@@ -31,6 +32,15 @@ public class VariableSpecifications
         this.specs = new ColumnSpecification[variableNames.size()];
     }
 
+    /**
+     * Returns an empty instance of <code>VariableSpecifications</code>.
+     * @return an empty instance of <code>VariableSpecifications</code>
+     */
+    public static VariableSpecifications empty()
+    {
+        return new VariableSpecifications(Collections.<ColumnIdentifier> emptyList());
+    }
+
     public int size()
     {
         return variableNames.size();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/cql3/restrictions/AbstractPrimaryKeyRestrictions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/AbstractPrimaryKeyRestrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/AbstractPrimaryKeyRestrictions.java
new file mode 100644
index 0000000..f137a77
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/restrictions/AbstractPrimaryKeyRestrictions.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.restrictions;
+
+/**
+ * Base class for <code>PrimaryKeyRestrictions</code>.
+ */
+abstract class AbstractPrimaryKeyRestrictions extends AbstractRestriction implements PrimaryKeyRestrictions
+{
+    @Override
+    public final boolean isEmpty()
+    {
+        return getColumnDefs().isEmpty();
+    }
+
+    @Override
+    public final int size()
+    {
+        return getColumnDefs().size();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/cql3/restrictions/AbstractRestriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/AbstractRestriction.java b/src/java/org/apache/cassandra/cql3/restrictions/AbstractRestriction.java
new file mode 100644
index 0000000..0ae7b22
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/restrictions/AbstractRestriction.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.restrictions;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.statements.Bound;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
+
+/**
+ * Base class for <code>Restriction</code>s
+ */
+abstract class AbstractRestriction  implements Restriction
+{
+    @Override
+    public  boolean isOnToken()
+    {
+        return false;
+    }
+
+    @Override
+    public boolean isMultiColumn()
+    {
+        return false;
+    }
+
+    @Override
+    public boolean isSlice()
+    {
+        return false;
+    }
+
+    @Override
+    public boolean isEQ()
+    {
+        return false;
+    }
+
+    @Override
+    public boolean isIN()
+    {
+        return false;
+    }
+
+    @Override
+    public boolean isContains()
+    {
+        return false;
+    }
+
+    @Override
+    public boolean hasBound(Bound b)
+    {
+        return true;
+    }
+
+    @Override
+    public List<ByteBuffer> bounds(Bound b, QueryOptions options) throws InvalidRequestException
+    {
+        return values(options);
+    }
+
+    @Override
+    public boolean isInclusive(Bound b)
+    {
+        return true;
+    }
+
+    protected static ByteBuffer validateIndexedValue(ColumnSpecification columnSpec,
+                                                     ByteBuffer value)
+                                                     throws InvalidRequestException
+    {
+        checkNotNull(value, "Unsupported null value for indexed column %s", columnSpec.name);
+        checkFalse(value.remaining() > 0xFFFF, "Index expression values may not be larger than 64K");
+        return value;
+    }
+
+    /**
+     * Checks if the specified term is using the specified function.
+     *
+     * @param term the term to check
+     * @param ksName the function keyspace name
+     * @param functionName the function name
+     * @return <code>true</code> if the specified term is using the specified function, <code>false</code> otherwise.
+     */
+    protected static final boolean usesFunction(Term term, String ksName, String functionName)
+    {
+        return term != null && term.usesFunction(ksName, functionName);
+    }
+
+    /**
+     * Checks if one of the specified term is using the specified function.
+     *
+     * @param terms the terms to check
+     * @param ksName the function keyspace name
+     * @param functionName the function name
+     * @return <code>true</code> if onee of the specified term is using the specified function, <code>false</code> otherwise.
+     */
+    protected static final boolean usesFunction(List<Term> terms, String ksName, String functionName)
+    {
+        if (terms != null)
+            for (Term value : terms)
+                if (usesFunction(value, ksName, functionName))
+                    return true;
+        return false;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/cql3/restrictions/ForwardingPrimaryKeyRestrictions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/ForwardingPrimaryKeyRestrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/ForwardingPrimaryKeyRestrictions.java
new file mode 100644
index 0000000..8a57292
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/restrictions/ForwardingPrimaryKeyRestrictions.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.restrictions;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.statements.Bound;
+import org.apache.cassandra.db.IndexExpression;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+/**
+ * A <code>PrimaryKeyRestrictions</code> which forwards all its method calls to another 
+ * <code>PrimaryKeyRestrictions</code>. Subclasses should override one or more methods to modify the behavior 
+ * of the backing <code>PrimaryKeyRestrictions</code> as desired per the decorator pattern. 
+ */
+abstract class ForwardingPrimaryKeyRestrictions implements PrimaryKeyRestrictions
+{
+    /**
+     * Returns the backing delegate instance that methods are forwarded to.
+     * @return the backing delegate instance that methods are forwarded to.
+     */
+    protected abstract PrimaryKeyRestrictions getDelegate();
+
+    @Override
+    public boolean usesFunction(String ksName, String functionName)
+    {
+        return getDelegate().usesFunction(ksName, functionName);
+    }
+
+    @Override
+    public Collection<ColumnDefinition> getColumnDefs()
+    {
+        return getDelegate().getColumnDefs();
+    }
+
+    @Override
+    public PrimaryKeyRestrictions mergeWith(Restriction restriction) throws InvalidRequestException
+    {
+        return getDelegate().mergeWith(restriction);
+    }
+
+    @Override
+    public boolean hasSupportingIndex(SecondaryIndexManager secondaryIndexManager)
+    {
+        return getDelegate().hasSupportingIndex(secondaryIndexManager);
+    }
+
+    @Override
+    public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
+    {
+        return getDelegate().values(options);
+    }
+
+    @Override
+    public List<Composite> valuesAsComposites(QueryOptions options) throws InvalidRequestException
+    {
+        return getDelegate().valuesAsComposites(options);
+    }
+
+    @Override
+    public List<ByteBuffer> bounds(Bound bound, QueryOptions options) throws InvalidRequestException
+    {
+        return getDelegate().bounds(bound, options);
+    }
+
+    @Override
+    public List<Composite> boundsAsComposites(Bound bound, QueryOptions options) throws InvalidRequestException
+    {
+        return getDelegate().boundsAsComposites(bound, options);
+    }
+
+    @Override
+    public boolean isInclusive(Bound bound)
+    {
+        return getDelegate().isInclusive(bound.reverse());
+    }
+
+    @Override
+    public boolean isEmpty()
+    {
+        return getDelegate().isEmpty();
+    }
+
+    @Override
+    public int size()
+    {
+        return getDelegate().size();
+    }
+
+    @Override
+    public boolean isOnToken()
+    {
+        return getDelegate().isOnToken();
+    }
+
+    @Override
+    public boolean isSlice()
+    {
+        return getDelegate().isSlice();
+    }
+
+    @Override
+    public boolean isEQ()
+    {
+        return getDelegate().isEQ();
+    }
+
+    @Override
+    public boolean isIN()
+    {
+        return getDelegate().isIN();
+    }
+
+    @Override
+    public boolean isContains()
+    {
+        return getDelegate().isContains();
+    }
+
+    @Override
+    public boolean isMultiColumn()
+    {
+        return getDelegate().isMultiColumn();
+    }
+
+    @Override
+    public boolean hasBound(Bound b)
+    {
+        return getDelegate().hasBound(b);
+    }
+
+    @Override
+    public void addIndexExpressionTo(List<IndexExpression> expressions,
+                                     QueryOptions options) throws InvalidRequestException
+    {
+        getDelegate().addIndexExpressionTo(expressions, options);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java b/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
new file mode 100644
index 0000000..e3b3c4c
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.restrictions;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.AbstractMarker;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.Tuples;
+import org.apache.cassandra.cql3.statements.Bound;
+import org.apache.cassandra.db.IndexExpression;
+import org.apache.cassandra.db.composites.CBuilder;
+import org.apache.cassandra.db.composites.CType;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.Composites;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
+import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
+
+public abstract class MultiColumnRestriction extends AbstractPrimaryKeyRestrictions
+{
+    protected final CType ctype;
+
+    /**
+     * The columns to which the restriction apply.
+     */
+    protected final List<ColumnDefinition> columnDefs;
+
+    public MultiColumnRestriction(CType ctype, List<ColumnDefinition> columnDefs)
+    {
+        this.ctype = ctype;
+        this.columnDefs = columnDefs;
+    }
+
+    @Override
+    public boolean isMultiColumn()
+    {
+        return true;
+    }
+
+    @Override
+    public Collection<ColumnDefinition> getColumnDefs()
+    {
+        return columnDefs;
+    }
+
+    @Override
+    public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
+    {
+        return Composites.toByteBuffers(valuesAsComposites(options));
+    }
+
+    @Override
+    public final PrimaryKeyRestrictions mergeWith(Restriction otherRestriction) throws InvalidRequestException
+    {
+            checkTrue(otherRestriction.isMultiColumn(),
+                      "Mixing single column relations and multi column relations on clustering columns is not allowed");
+            return doMergeWith((PrimaryKeyRestrictions) otherRestriction);
+    }
+
+    protected abstract PrimaryKeyRestrictions doMergeWith(PrimaryKeyRestrictions otherRestriction) throws InvalidRequestException;
+
+    /**
+     * Returns the names of the columns that are specified within this <code>Restrictions</code> and the other one
+     * as a comma separated <code>String</code>.
+     *
+     * @param otherRestrictions the other restrictions
+     * @return the names of the columns that are specified within this <code>Restrictions</code> and the other one
+     * as a comma separated <code>String</code>.
+     */
+    protected final String getColumnsInCommons(Restrictions otherRestrictions)
+    {
+        Set<ColumnDefinition> commons = new HashSet<>(getColumnDefs());
+        commons.retainAll(otherRestrictions.getColumnDefs());
+        StringBuilder builder = new StringBuilder();
+        for (ColumnDefinition columnDefinition : commons)
+        {
+            if (builder.length() != 0)
+                builder.append(" ,");
+            builder.append(columnDefinition.name);
+        }
+        return builder.toString();
+    }
+
+    @Override
+    public final boolean hasSupportingIndex(SecondaryIndexManager indexManager)
+    {
+        for (ColumnDefinition columnDef : columnDefs)
+        {
+            SecondaryIndex index = indexManager.getIndexForColumn(columnDef.name.bytes);
+            if (index != null && isSupportedBy(index))
+                return true;
+        }
+        return false;
+    }
+
+    /**
+     * Check if this type of restriction is supported for the specified column by the specified index.
+     * @param index the Secondary index
+     *
+     * @return <code>true</code> this type of restriction is supported by the specified index,
+     * <code>false</code> otherwise.
+     */
+    protected abstract boolean isSupportedBy(SecondaryIndex index);
+
+    public static class EQ  extends MultiColumnRestriction
+    {
+        protected final Term value;
+
+        public EQ(CType ctype, List<ColumnDefinition> columnDefs, Term value)
+        {
+            super(ctype, columnDefs);
+            this.value = value;
+        }
+
+        @Override
+        public boolean usesFunction(String ksName, String functionName)
+        {
+            return usesFunction(value, ksName, functionName);
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("EQ(%s)", value);
+        }
+
+        @Override
+        public PrimaryKeyRestrictions doMergeWith(PrimaryKeyRestrictions otherRestriction) throws InvalidRequestException
+        {
+            throw invalidRequest("%s cannot be restricted by more than one relation if it includes an Equal",
+                                 getColumnsInCommons(otherRestriction));
+        }
+
+        @Override
+        public List<Composite> valuesAsComposites(QueryOptions options) throws InvalidRequestException
+        {
+            return Collections.singletonList(compositeValue(options));
+        }
+
+        @Override
+        public List<Composite> boundsAsComposites(Bound bound, QueryOptions options) throws InvalidRequestException
+        {
+            Composite prefix = compositeValue(options);
+            return Collections.singletonList(ctype.size() > prefix.size() && bound.isEnd()
+                                             ? prefix.end()
+                                             : prefix);
+        }
+
+        @Override
+        protected boolean isSupportedBy(SecondaryIndex index)
+        {
+            return index.supportsOperator(Operator.EQ);
+        }
+
+        private Composite compositeValue(QueryOptions options) throws InvalidRequestException
+        {
+            CBuilder builder = ctype.builder();
+            Tuples.Value t = ((Tuples.Value) value.bind(options));
+            List<ByteBuffer> values = t.getElements();
+            for (int i = 0; i < values.size(); i++)
+            {
+                ByteBuffer component = checkNotNull(values.get(i),
+                                                    "Invalid null value in condition for column %s",
+                                                    columnDefs.get(i).name);
+                builder.add(component);
+            }
+
+            return builder.build();
+        }
+
+        @Override
+        public final void addIndexExpressionTo(List<IndexExpression> expressions,
+                                               QueryOptions options) throws InvalidRequestException
+        {
+            Tuples.Value t = ((Tuples.Value) value.bind(options));
+            List<ByteBuffer> values = t.getElements();
+            for (int i = 0; i < values.size(); i++)
+            {
+                ColumnDefinition columnDef = columnDefs.get(i);
+                ByteBuffer component = validateIndexedValue(columnDef, values.get(i));
+                expressions.add(new IndexExpression(columnDef.name.bytes, Operator.EQ, component));
+            }
+        }
+    }
+
+    public abstract static class IN extends MultiColumnRestriction
+    {
+        @Override
+        public List<Composite> valuesAsComposites(QueryOptions options) throws InvalidRequestException
+        {
+            CBuilder builder = ctype.builder();
+            List<List<ByteBuffer>> splitInValues = splitValues(options);
+            // The IN query might not have listed the values in comparator order, so we need to re-sort
+            // the bounds lists to make sure the slices works correctly (also, to avoid duplicates).
+            TreeSet<Composite> inValues = new TreeSet<>(ctype);
+            for (List<ByteBuffer> components : splitInValues)
+            {
+                for (int i = 0; i < components.size(); i++)
+                    checkNotNull(components.get(i), "Invalid null value in condition for column " + columnDefs.get(i).name);
+
+                inValues.add(builder.buildWith(components));
+            }
+            return new ArrayList<>(inValues);
+        }
+
+        @Override
+        public List<Composite> boundsAsComposites(Bound bound, QueryOptions options) throws InvalidRequestException
+        {
+            CBuilder builder = ctype.builder();
+            List<List<ByteBuffer>> splitInValues = splitValues(options);
+            // The IN query might not have listed the values in comparator order, so we need to re-sort
+            // the bounds lists to make sure the slices works correctly (also, to avoid duplicates).
+            TreeSet<Composite> inValues = new TreeSet<>(ctype);
+            for (List<ByteBuffer> components : splitInValues)
+            {
+                for (int i = 0; i < components.size(); i++)
+                    checkNotNull(components.get(i), "Invalid null value in condition for column %s", columnDefs.get(i).name);
+
+                Composite prefix = builder.buildWith(components);
+                inValues.add(bound.isEnd() && builder.remainingCount() - components.size() > 0
+                             ? prefix.end()
+                             : prefix);
+            }
+            return new ArrayList<>(inValues);
+        }
+
+        @Override
+        public void addIndexExpressionTo(List<IndexExpression> expressions,
+                                         QueryOptions options) throws InvalidRequestException
+        {
+            List<List<ByteBuffer>> splitInValues = splitValues(options);
+            checkTrue(splitInValues.size() == 1, "IN restrictions are not supported on indexed columns");
+
+            List<ByteBuffer> values = splitInValues.get(0);
+            checkTrue(values.size() == 1, "IN restrictions are not supported on indexed columns");
+
+            ColumnDefinition columnDef = columnDefs.get(0);
+            ByteBuffer component = validateIndexedValue(columnDef, values.get(0));
+            expressions.add(new IndexExpression(columnDef.name.bytes, Operator.EQ, component));
+        }
+
+        public IN(CType ctype, List<ColumnDefinition> columnDefs)
+        {
+            super(ctype, columnDefs);
+        }
+
+        @Override
+        public boolean isIN()
+        {
+            return true;
+        }
+
+        @Override
+        public PrimaryKeyRestrictions doMergeWith(PrimaryKeyRestrictions otherRestrictions) throws InvalidRequestException
+        {
+            throw invalidRequest("%s cannot be restricted by more than one relation if it includes a IN",
+                                 getColumnsInCommons(otherRestrictions));
+        }
+
+        @Override
+        protected boolean isSupportedBy(SecondaryIndex index)
+        {
+            return index.supportsOperator(Operator.IN);
+        }
+
+        protected abstract List<List<ByteBuffer>> splitValues(QueryOptions options) throws InvalidRequestException;
+    }
+
+    /**
+     * An IN restriction that has a set of terms for in values.
+     * For example: "SELECT ... WHERE (a, b, c) IN ((1, 2, 3), (4, 5, 6))" or "WHERE (a, b, c) IN (?, ?)"
+     */
+    public static class InWithValues extends MultiColumnRestriction.IN
+    {
+        protected final List<Term> values;
+
+        public InWithValues(CType ctype, List<ColumnDefinition> columnDefs, List<Term> values)
+        {
+            super(ctype, columnDefs);
+            this.values = values;
+        }
+
+        @Override
+        public boolean usesFunction(String ksName, String functionName)
+        {
+            return usesFunction(values, ksName, functionName);
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("IN(%s)", values);
+        }
+
+        @Override
+        protected List<List<ByteBuffer>> splitValues(QueryOptions options) throws InvalidRequestException
+        {
+            List<List<ByteBuffer>> buffers = new ArrayList<>(values.size());
+            for (Term value : values)
+            {
+                Term.MultiItemTerminal term = (Term.MultiItemTerminal) value.bind(options);
+                buffers.add(term.getElements());
+            }
+            return buffers;
+        }
+    }
+
+    /**
+     * An IN restriction that uses a single marker for a set of IN values that are tuples.
+     * For example: "SELECT ... WHERE (a, b, c) IN ?"
+     */
+    public static class InWithMarker extends MultiColumnRestriction.IN
+    {
+        protected final AbstractMarker marker;
+
+        public InWithMarker(CType ctype, List<ColumnDefinition> columnDefs, AbstractMarker marker)
+        {
+            super(ctype, columnDefs);
+            this.marker = marker;
+        }
+
+        @Override
+        public boolean usesFunction(String ksName, String functionName)
+        {
+            return false;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "IN ?";
+        }
+
+        @Override
+        protected List<List<ByteBuffer>> splitValues(QueryOptions options) throws InvalidRequestException
+        {
+            Tuples.InMarker inMarker = (Tuples.InMarker) marker;
+            Tuples.InValue inValue = inMarker.bind(options);
+            checkNotNull(inValue, "Invalid null value for IN restriction");
+            return inValue.getSplitValues();
+        }
+    }
+
+    public static class Slice extends MultiColumnRestriction
+    {
+        private final TermSlice slice;
+
+        public Slice(CType ctype, List<ColumnDefinition> columnDefs, Bound bound, boolean inclusive, Term term)
+        {
+            this(ctype, columnDefs, TermSlice.newInstance(bound, inclusive, term));
+        }
+
+        private Slice(CType ctype, List<ColumnDefinition> columnDefs, TermSlice slice)
+        {
+            super(ctype, columnDefs);
+            this.slice = slice;
+        }
+
+        @Override
+        public boolean isSlice()
+        {
+            return true;
+        }
+
+        @Override
+        public List<Composite> valuesAsComposites(QueryOptions options) throws InvalidRequestException
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public List<ByteBuffer> bounds(Bound b, QueryOptions options) throws InvalidRequestException
+        {
+            return Composites.toByteBuffers(boundsAsComposites(b, options));
+        }
+
+        @Override
+        public List<Composite> boundsAsComposites(Bound bound, QueryOptions options) throws InvalidRequestException
+        {
+            CBuilder builder = ctype.builder();
+            Iterator<ColumnDefinition> iter = columnDefs.iterator();
+            ColumnDefinition firstName = iter.next();
+            // A hack to preserve pre-6875 behavior for tuple-notation slices where the comparator mixes ASCENDING
+            // and DESCENDING orders.  This stores the bound for the first component; we will re-use it for all following
+            // components, even if they don't match the first component's reversal/non-reversal.  Note that this does *not*
+            // guarantee correct query results, it just preserves the previous behavior.
+            Bound firstComponentBound = !firstName.isReversedType() ? bound : bound.reverse();
+
+            if (!hasBound(firstComponentBound))
+            {
+                Composite prefix = builder.build();
+                return Collections.singletonList(builder.remainingCount() > 0 && bound.isEnd()
+                        ? prefix.end()
+                        : prefix);
+            }
+
+            List<ByteBuffer> vals = componentBounds(firstComponentBound, options);
+
+            ByteBuffer v = checkNotNull(vals.get(firstName.position()), "Invalid null value in condition for column %s", firstName.name);
+            builder.add(v);
+
+            while (iter.hasNext())
+            {
+                ColumnDefinition def = iter.next();
+                if (def.position() >= vals.size())
+                    break;
+
+                v = checkNotNull(vals.get(def.position()), "Invalid null value in condition for column %s", def.name);
+                builder.add(v);
+            }
+            Composite.EOC eoc =  eocFor(this, bound, firstComponentBound);
+            return Collections.singletonList(builder.build().withEOC(eoc));
+        }
+
+        @Override
+        public void addIndexExpressionTo(List<IndexExpression> expressions,
+                                         QueryOptions options) throws InvalidRequestException
+        {
+            throw invalidRequest("Slice restrictions are not supported on indexed columns which are part of a multi column relation");
+        }
+
+        @Override
+        protected boolean isSupportedBy(SecondaryIndex index)
+        {
+            return slice.isSupportedBy(index);
+        }
+
+        private static Composite.EOC eocFor(Restriction r, Bound eocBound, Bound inclusiveBound)
+        {
+            if (eocBound.isStart())
+                return r.isInclusive(inclusiveBound) ? Composite.EOC.NONE : Composite.EOC.END;
+
+            return r.isInclusive(inclusiveBound) ? Composite.EOC.END : Composite.EOC.START;
+        }
+
+        @Override
+        public boolean hasBound(Bound b)
+        {
+            return slice.hasBound(b);
+        }
+
+        @Override
+        public boolean usesFunction(String ksName, String functionName)
+        {
+            return (slice.hasBound(Bound.START) && usesFunction(slice.bound(Bound.START), ksName, functionName))
+                    || (slice.hasBound(Bound.END) && usesFunction(slice.bound(Bound.END), ksName, functionName));
+        }
+
+        @Override
+        public boolean isInclusive(Bound b)
+        {
+            return slice.isInclusive(b);
+        }
+
+        @Override
+        public PrimaryKeyRestrictions doMergeWith(PrimaryKeyRestrictions otherRestriction) throws InvalidRequestException
+        {
+            checkTrue(otherRestriction.isSlice(),
+                      "Column \"%s\" cannot be restricted by both an equality and an inequality relation",
+                      getColumnsInCommons(otherRestriction));
+
+            Slice otherSlice = (Slice) otherRestriction;
+
+            checkFalse(hasBound(Bound.START) && otherSlice.hasBound(Bound.START),
+                       "More than one restriction was found for the start bound on %s",
+                       getColumnsInCommons(otherRestriction));
+            checkFalse(hasBound(Bound.END) && otherSlice.hasBound(Bound.END),
+                       "More than one restriction was found for the end bound on %s",
+                       getColumnsInCommons(otherRestriction));
+
+            List<ColumnDefinition> newColumnDefs = size() >= otherSlice.size() ?  columnDefs : otherSlice.columnDefs;
+            return new Slice(ctype,  newColumnDefs, slice.merge(otherSlice.slice));
+        }
+
+        @Override
+        public String toString()
+        {
+            return "SLICE" + slice;
+        }
+
+        /**
+         * Similar to bounds(), but returns one ByteBuffer per-component in the bound instead of a single
+         * ByteBuffer to represent the entire bound.
+         * @param b the bound type
+         * @param options the query options
+         * @return one ByteBuffer per-component in the bound
+         * @throws InvalidRequestException if the components cannot be retrieved
+         */
+        private List<ByteBuffer> componentBounds(Bound b, QueryOptions options) throws InvalidRequestException
+        {
+            Tuples.Value value = (Tuples.Value) slice.bound(b).bind(options);
+            return value.getElements();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/cql3/restrictions/PrimaryKeyRestrictions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/PrimaryKeyRestrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/PrimaryKeyRestrictions.java
new file mode 100644
index 0000000..5f977b7
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/restrictions/PrimaryKeyRestrictions.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.restrictions;
+
+import java.util.List;
+
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.statements.Bound;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+/**
+ * A set of restrictions on a primary key part (partition key or clustering key).
+ *
+ */
+interface PrimaryKeyRestrictions extends Restriction, Restrictions
+{
+
+    @Override
+    public PrimaryKeyRestrictions mergeWith(Restriction restriction) throws InvalidRequestException;
+
+    public List<Composite> valuesAsComposites(QueryOptions options) throws InvalidRequestException;
+
+    public List<Composite> boundsAsComposites(Bound bound, QueryOptions options) throws InvalidRequestException;
+}