You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2014/12/02 20:09:08 UTC

[4/5] cassandra git commit: Refactor SelectStatement and Restrictions

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java b/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java
new file mode 100644
index 0000000..d0ed193
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.restrictions;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.statements.Bound;
+import org.apache.cassandra.db.IndexExpression;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+/**
+ * A restriction/clause on a column.
+ * The goal of this class being to group all conditions for a column in a SELECT.
+ */
+public interface Restriction
+{
+    public boolean isOnToken();
+    public boolean isSlice();
+    public boolean isEQ();
+    public boolean isIN();
+    public boolean isContains();
+    public boolean isMultiColumn();
+
+    public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException;
+
+    /**
+     * Returns <code>true</code> if one of the restrictions use the specified function.
+     *
+     * @param ksName the keyspace name
+     * @param functionName the function name
+     * @return <code>true</code> if one of the restrictions use the specified function, <code>false</code> otherwise.
+     */
+    public boolean usesFunction(String ksName, String functionName);
+
+    /**
+     * Checks if the specified bound is set or not.
+     * @param b the bound type
+     * @return <code>true</code> if the specified bound is set, <code>false</code> otherwise
+     */
+    public boolean hasBound(Bound b);
+
+    public List<ByteBuffer> bounds(Bound b, QueryOptions options) throws InvalidRequestException;
+
+    /**
+     * Checks if the specified bound is inclusive or not.
+     * @param b the bound type
+     * @return <code>true</code> if the specified bound is inclusive, <code>false</code> otherwise
+     */
+    public boolean isInclusive(Bound b);
+
+    /**
+     * Merges this restriction with the specified one.
+     *
+     * @param otherRestriction the restriction to merge into this one
+     * @return the restriction resulting of the merge
+     * @throws InvalidRequestException if the restrictions cannot be merged
+     */
+    public Restriction mergeWith(Restriction otherRestriction) throws InvalidRequestException;
+
+    /**
+     * Check if the restriction is on indexed columns.
+     *
+     * @param indexManager the index manager
+     * @return <code>true</code> if the restriction is on indexed columns, <code>false</code>
+     */
+    public boolean hasSupportingIndex(SecondaryIndexManager indexManager);
+
+    /**
+     * Adds to the specified list the <code>IndexExpression</code>s corresponding to this <code>Restriction</code>.
+     *
+     * @param expressions the list to add the <code>IndexExpression</code>s to
+     * @param options the query options
+     * @throws InvalidRequestException if this <code>Restriction</code> cannot be converted into 
+     * <code>IndexExpression</code>s
+     */
+    public void addIndexExpressionTo(List<IndexExpression> expressions,
+                                     QueryOptions options)
+                                     throws InvalidRequestException;
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
new file mode 100644
index 0000000..cf2555e
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.restrictions;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.db.IndexExpression;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+/**
+ * Sets of restrictions
+ */
+interface Restrictions
+{
+    /**
+     * Returns the column definitions in position order.
+     * @return the column definitions in position order.
+     */
+    public Collection<ColumnDefinition> getColumnDefs();
+
+    /**
+     * Returns <code>true</code> if one of the restrictions use the specified function.
+     *
+     * @param ksName the keyspace name
+     * @param functionName the function name
+     * @return <code>true</code> if one of the restrictions use the specified function, <code>false</code> otherwise.
+     */
+    public boolean usesFunction(String ksName, String functionName);
+
+    /**
+     * Check if the restriction is on indexed columns.
+     *
+     * @param indexManager the index manager
+     * @return <code>true</code> if the restriction is on indexed columns, <code>false</code>
+     */
+    public boolean hasSupportingIndex(SecondaryIndexManager indexManager);
+
+    /**
+     * Adds to the specified list the <code>IndexExpression</code>s corresponding to this <code>Restriction</code>.
+     *
+     * @param expressions the list to add the <code>IndexExpression</code>s to
+     * @param options the query options
+     * @throws InvalidRequestException if this <code>Restriction</code> cannot be converted into
+     * <code>IndexExpression</code>s
+     */
+    public void addIndexExpressionTo(List<IndexExpression> expressions,
+                                     QueryOptions options)
+                                     throws InvalidRequestException;
+
+    /**
+     * Checks if this <code>SingleColumnPrimaryKeyRestrictions</code> is empty or not.
+     *
+     * @return <code>true</code> if this <code>SingleColumnPrimaryKeyRestrictions</code> is empty, <code>false</code> otherwise.
+     */
+    boolean isEmpty();
+
+    /**
+     * Returns the number of columns that have a restriction.
+     *
+     * @return the number of columns that have a restriction.
+     */
+    public int size();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/cql3/restrictions/ReversedPrimaryKeyRestrictions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/ReversedPrimaryKeyRestrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/ReversedPrimaryKeyRestrictions.java
new file mode 100644
index 0000000..9b33161
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/restrictions/ReversedPrimaryKeyRestrictions.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.restrictions;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.statements.Bound;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+/**
+ * <code>PrimaryKeyRestrictions</code> decorator that reverse the slices.
+ */
+final class ReversedPrimaryKeyRestrictions extends ForwardingPrimaryKeyRestrictions
+{
+    /**
+     * The decorated restrictions.
+     */
+    private PrimaryKeyRestrictions restrictions;
+
+    public ReversedPrimaryKeyRestrictions(PrimaryKeyRestrictions restrictions)
+    {
+        this.restrictions = restrictions;
+    }
+
+    @Override
+    public PrimaryKeyRestrictions mergeWith(Restriction restriction) throws InvalidRequestException
+    {
+        return new ReversedPrimaryKeyRestrictions(this.restrictions.mergeWith(restriction));
+    }
+
+    @Override
+    public List<ByteBuffer> bounds(Bound bound, QueryOptions options) throws InvalidRequestException
+    {
+        List<ByteBuffer> buffers = restrictions.bounds(bound.reverse(), options);
+        Collections.reverse(buffers);
+        return buffers;
+    }
+
+    @Override
+    public List<Composite> boundsAsComposites(Bound bound, QueryOptions options) throws InvalidRequestException
+    {
+        List<Composite> composites = restrictions.boundsAsComposites(bound.reverse(), options);
+        Collections.reverse(composites);
+        return composites;
+    }
+
+    @Override
+    public boolean isInclusive(Bound bound)
+    {
+        return this.restrictions.isInclusive(bound.reverse());
+    }
+
+    @Override
+    protected PrimaryKeyRestrictions getDelegate()
+    {
+        return this.restrictions;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnPrimaryKeyRestrictions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnPrimaryKeyRestrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnPrimaryKeyRestrictions.java
new file mode 100644
index 0000000..3858cdc
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnPrimaryKeyRestrictions.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.restrictions;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.statements.Bound;
+import org.apache.cassandra.db.IndexExpression;
+import org.apache.cassandra.db.composites.CBuilder;
+import org.apache.cassandra.db.composites.CType;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.Composite.EOC;
+import org.apache.cassandra.db.composites.Composites;
+import org.apache.cassandra.db.composites.CompositesBuilder;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
+
+/**
+ * A set of single column restrictions on a primary key part (partition key or clustering key).
+ */
+final class SingleColumnPrimaryKeyRestrictions extends AbstractPrimaryKeyRestrictions
+{
+    /**
+     * The composite type.
+     */
+    private final CType ctype;
+
+    /**
+     * The restrictions.
+     */
+    private final SingleColumnRestrictions restrictions;
+
+    /**
+     * <code>true</code> if the restrictions are corresponding to an EQ, <code>false</code> otherwise.
+     */
+    private boolean eq;
+
+    /**
+     * <code>true</code> if the restrictions are corresponding to an IN, <code>false</code> otherwise.
+     */
+    private boolean in;
+
+    /**
+     * <code>true</code> if the restrictions are corresponding to a Slice, <code>false</code> otherwise.
+     */
+    private boolean slice;
+
+    /**
+     * <code>true</code> if the restrictions are corresponding to a Contains, <code>false</code> otherwise.
+     */
+    private boolean contains;
+
+    public SingleColumnPrimaryKeyRestrictions(CType ctype)
+    {
+        this.ctype = ctype;
+        this.restrictions = new SingleColumnRestrictions();
+        this.eq = true;
+    }
+
+    private SingleColumnPrimaryKeyRestrictions(SingleColumnPrimaryKeyRestrictions primaryKeyRestrictions,
+                                               SingleColumnRestriction restriction) throws InvalidRequestException
+    {
+        this.restrictions = primaryKeyRestrictions.restrictions.addRestriction(restriction);
+        this.ctype = primaryKeyRestrictions.ctype;
+
+        if (!primaryKeyRestrictions.isEmpty())
+        {
+            ColumnDefinition lastColumn = primaryKeyRestrictions.restrictions.lastColumn();
+            ColumnDefinition newColumn = restriction.getColumnDef();
+
+            checkFalse(primaryKeyRestrictions.isSlice() && newColumn.position() > lastColumn.position(),
+                       "Clustering column \"%s\" cannot be restricted (preceding column \"%s\" is restricted by a non-EQ relation)",
+                       newColumn.name,
+                       lastColumn.name);
+
+            if (newColumn.position() < lastColumn.position())
+                checkFalse(restriction.isSlice(),
+                           "PRIMARY KEY column \"%s\" cannot be restricted (preceding column \"%s\" is restricted by a non-EQ relation)",
+                           restrictions.nextColumn(newColumn).name,
+                           newColumn.name);
+        }
+
+        if (restriction.isSlice() || primaryKeyRestrictions.isSlice())
+            this.slice = true;
+        else if (restriction.isContains() || primaryKeyRestrictions.isContains())
+            this.contains = true;
+        else if (restriction.isIN())
+            this.in = true;
+        else
+            this.eq = true;
+    }
+
+    @Override
+    public boolean isSlice()
+    {
+        return slice;
+    }
+
+    @Override
+    public boolean isEQ()
+    {
+        return eq;
+    }
+
+    @Override
+    public boolean isIN()
+    {
+        return in;
+    }
+
+    @Override
+    public boolean isOnToken()
+    {
+        return false;
+    }
+
+    @Override
+    public boolean isContains()
+    {
+        return contains;
+    }
+
+    @Override
+    public boolean isMultiColumn()
+    {
+        return false;
+    }
+
+    @Override
+    public boolean usesFunction(String ksName, String functionName)
+    {
+        return restrictions.usesFunction(ksName, functionName);
+    }
+
+    @Override
+    public PrimaryKeyRestrictions mergeWith(Restriction restriction) throws InvalidRequestException
+    {
+        if (restriction.isMultiColumn())
+        {
+            checkTrue(isEmpty(),
+                      "Mixing single column relations and multi column relations on clustering columns is not allowed");
+            return (PrimaryKeyRestrictions) restriction;
+        }
+
+        if (restriction.isOnToken())
+        {
+            checkTrue(isEmpty(), "Columns \"%s\" cannot be restricted by both a normal relation and a token relation",
+                      ((TokenRestriction) restriction).getColumnNamesAsString());
+            return (PrimaryKeyRestrictions) restriction;
+        }
+
+        return new SingleColumnPrimaryKeyRestrictions(this, (SingleColumnRestriction) restriction);
+    }
+
+    @Override
+    public List<Composite> valuesAsComposites(QueryOptions options) throws InvalidRequestException
+    {
+        CompositesBuilder builder = new CompositesBuilder(ctype.builder(), ctype);
+        for (ColumnDefinition def : restrictions.getColumnDefs())
+        {
+            Restriction r = restrictions.getRestriction(def);
+            assert !r.isSlice();
+
+            List<ByteBuffer> values = r.values(options);
+
+            if (values.isEmpty())
+                return null;
+
+            builder.addEachElementToAll(values);
+            checkFalse(builder.containsNull(), "Invalid null value for column %s", def.name);
+        }
+
+        return builder.build();
+    }
+
+    @Override
+    public List<Composite> boundsAsComposites(Bound bound, QueryOptions options) throws InvalidRequestException
+    {
+        CBuilder builder = ctype.builder();
+        List<ColumnDefinition> defs = new ArrayList<>(restrictions.getColumnDefs());
+
+        CompositesBuilder compositeBuilder = new CompositesBuilder(builder, ctype);
+        // The end-of-component of composite doesn't depend on whether the
+        // component type is reversed or not (i.e. the ReversedType is applied
+        // to the component comparator but not to the end-of-component itself),
+        // it only depends on whether the slice is reversed
+        int keyPosition = 0;
+        for (ColumnDefinition def : defs)
+        {
+            // In a restriction, we always have Bound.START < Bound.END for the "base" comparator.
+            // So if we're doing a reverse slice, we must inverse the bounds when giving them as start and end of the slice filter.
+            // But if the actual comparator itself is reversed, we must inversed the bounds too.
+            Bound b = !def.isReversedType() ? bound : bound.reverse();
+            Restriction r = restrictions.getRestriction(def);
+            if (keyPosition != def.position() || r.isContains())
+            {
+                EOC eoc = !compositeBuilder.isEmpty() && bound.isEnd() ? EOC.END : EOC.NONE;
+                return compositeBuilder.buildWithEOC(eoc);
+            }
+            if (r.isSlice())
+            {
+                if (!r.hasBound(b))
+                {
+                    // There wasn't any non EQ relation on that key, we select all records having the preceding component as prefix.
+                    // For composites, if there was preceding component and we're computing the end, we must change the last component
+                    // End-Of-Component, otherwise we would be selecting only one record.
+                    EOC eoc = !compositeBuilder.isEmpty() && bound.isEnd() ? EOC.END : EOC.NONE;
+                    return compositeBuilder.buildWithEOC(eoc);
+                }
+
+                ByteBuffer value = checkNotNull(r.bounds(b, options).get(0), "Invalid null clustering key part %s", r);
+                compositeBuilder.addElementToAll(value);
+                Composite.EOC eoc = eocFor(r, bound, b);
+                return compositeBuilder.buildWithEOC(eoc);
+            }
+
+            compositeBuilder.addEachElementToAll(r.values(options));
+
+            checkFalse(compositeBuilder.containsNull(), "Invalid null clustering key part %s", def.name);
+            keyPosition++;
+        }
+        // Means no relation at all or everything was an equal
+        // Note: if the builder is "full", there is no need to use the end-of-component bit. For columns selection,
+        // it would be harmless to do it. However, we use this method got the partition key too. And when a query
+        // with 2ndary index is done, and with the the partition provided with an EQ, we'll end up here, and in that
+        // case using the eoc would be bad, since for the random partitioner we have no guarantee that
+        // prefix.end() will sort after prefix (see #5240).
+        EOC eoc = bound.isEnd() && compositeBuilder.hasRemaining() ? EOC.END : EOC.NONE;
+        return compositeBuilder.buildWithEOC(eoc);
+    }
+
+    @Override
+    public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
+    {
+        return Composites.toByteBuffers(valuesAsComposites(options));
+    }
+
+    @Override
+    public List<ByteBuffer> bounds(Bound b, QueryOptions options) throws InvalidRequestException
+    {
+        return Composites.toByteBuffers(boundsAsComposites(b, options));
+    }
+
+    private static Composite.EOC eocFor(Restriction r, Bound eocBound, Bound inclusiveBound)
+    {
+        if (eocBound.isStart())
+            return r.isInclusive(inclusiveBound) ? Composite.EOC.NONE : Composite.EOC.END;
+
+        return r.isInclusive(inclusiveBound) ? Composite.EOC.END : Composite.EOC.START;
+    }
+
+    @Override
+    public boolean hasBound(Bound b)
+    {
+        if (isEmpty())
+            return false;
+        return restrictions.lastRestriction().hasBound(b);
+    }
+
+    @Override
+    public boolean isInclusive(Bound b)
+    {
+        if (isEmpty())
+            return false;
+        return restrictions.lastRestriction().isInclusive(b);
+    }
+
+    @Override
+    public boolean hasSupportingIndex(SecondaryIndexManager indexManager)
+    {
+        return restrictions.hasSupportingIndex(indexManager);
+    }
+
+    @Override
+    public void addIndexExpressionTo(List<IndexExpression> expressions, QueryOptions options) throws InvalidRequestException
+    {
+        restrictions.addIndexExpressionTo(expressions, options);
+    }
+
+    @Override
+    public Collection<ColumnDefinition> getColumnDefs()
+    {
+        return restrictions.getColumnDefs();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
new file mode 100644
index 0000000..0f0f9c8
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.restrictions;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.AbstractMarker;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.statements.Bound;
+import org.apache.cassandra.db.IndexExpression;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
+import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
+
+public abstract class SingleColumnRestriction extends AbstractRestriction
+{
+    /**
+     * The definition of the column to which apply the restriction.
+     */
+    protected final ColumnDefinition columnDef;
+
+    public SingleColumnRestriction(ColumnDefinition columnDef)
+    {
+        this.columnDef = columnDef;
+    }
+
+    /**
+     * Returns the definition of the column to which is associated this restriction.
+     * @return the definition of the column to which is associated this restriction
+     */
+    public ColumnDefinition getColumnDef()
+    {
+        return columnDef;
+    }
+
+    @Override
+    public void addIndexExpressionTo(List<IndexExpression> expressions,
+                                     QueryOptions options) throws InvalidRequestException
+    {
+        List<ByteBuffer> values = values(options);
+        checkTrue(values.size() == 1, "IN restrictions are not supported on indexed columns");
+
+        ByteBuffer value = validateIndexedValue(columnDef, values.get(0));
+        expressions.add(new IndexExpression(columnDef.name.bytes, Operator.EQ, value));
+    }
+
+    @Override
+    public boolean hasSupportingIndex(SecondaryIndexManager indexManager)
+    {
+        SecondaryIndex index = indexManager.getIndexForColumn(columnDef.name.bytes);
+        return index != null && isSupportedBy(index);
+    }
+
+    /**
+     * Check if this type of restriction is supported by the specified index.
+     *
+     * @param index the Secondary index
+     * @return <code>true</code> this type of restriction is supported by the specified index,
+     * <code>false</code> otherwise.
+     */
+    protected abstract boolean isSupportedBy(SecondaryIndex index);
+
+    public static final class EQ extends SingleColumnRestriction
+    {
+        private final Term value;
+
+        public EQ(ColumnDefinition columnDef, Term value)
+        {
+            super(columnDef);
+            this.value = value;
+        }
+
+        @Override
+        public boolean usesFunction(String ksName, String functionName)
+        {
+            return usesFunction(value, ksName, functionName);
+        }
+
+        public boolean isEQ()
+        {
+            return true;
+        }
+
+        @Override
+        public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
+        {
+            return Collections.singletonList(value.bindAndGet(options));
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("EQ(%s)", value);
+        }
+
+        @Override
+        public Restriction mergeWith(Restriction otherRestriction) throws InvalidRequestException
+        {
+            throw invalidRequest("%s cannot be restricted by more than one relation if it includes an Equal", columnDef.name);
+        }
+
+        @Override
+        protected boolean isSupportedBy(SecondaryIndex index)
+        {
+            return index.supportsOperator(Operator.EQ);
+        }
+    }
+
+    public static abstract class IN extends SingleColumnRestriction
+    {
+        public IN(ColumnDefinition columnDef)
+        {
+            super(columnDef);
+        }
+
+        @Override
+        public final boolean isIN()
+        {
+            return true;
+        }
+
+        @Override
+        public final Restriction mergeWith(Restriction otherRestriction) throws InvalidRequestException
+        {
+            throw invalidRequest("%s cannot be restricted by more than one relation if it includes a IN", columnDef.name);
+        }
+
+        @Override
+        protected final boolean isSupportedBy(SecondaryIndex index)
+        {
+            return index.supportsOperator(Operator.IN);
+        }
+    }
+
+    public static class InWithValues extends IN
+    {
+        protected final List<Term> values;
+
+        public InWithValues(ColumnDefinition columnDef, List<Term> values)
+        {
+            super(columnDef);
+            this.values = values;
+        }
+
+        @Override
+        public boolean usesFunction(String ksName, String functionName)
+        {
+            return usesFunction(values, ksName, functionName);
+        }
+
+        @Override
+        public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
+        {
+            List<ByteBuffer> buffers = new ArrayList<>(values.size());
+            for (Term value : values)
+                buffers.add(value.bindAndGet(options));
+            return buffers;
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("IN(%s)", values);
+        }
+    }
+
+    public static class InWithMarker extends IN
+    {
+        protected final AbstractMarker marker;
+
+        public InWithMarker(ColumnDefinition columnDef, AbstractMarker marker)
+        {
+            super(columnDef);
+            this.marker = marker;
+        }
+
+        @Override
+        public boolean usesFunction(String ksName, String functionName)
+        {
+            return false;
+        }
+
+        public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
+        {
+            Term.MultiItemTerminal lval = (Term.MultiItemTerminal) marker.bind(options);
+            if (lval == null)
+                throw new InvalidRequestException("Invalid null value for IN restriction");
+            return lval.getElements();
+        }
+
+        @Override
+        public String toString()
+        {
+            return "IN ?";
+        }
+    }
+
+    public static class Slice extends SingleColumnRestriction
+    {
+        private final TermSlice slice;
+
+        public Slice(ColumnDefinition columnDef, Bound bound, boolean inclusive, Term term)
+        {
+            super(columnDef);
+            slice = TermSlice.newInstance(bound, inclusive, term);
+        }
+
+        @Override
+        public boolean usesFunction(String ksName, String functionName)
+        {
+            return (slice.hasBound(Bound.START) && usesFunction(slice.bound(Bound.START), ksName, functionName))
+                    || (slice.hasBound(Bound.END) && usesFunction(slice.bound(Bound.END), ksName, functionName));
+        }
+
+        public boolean isSlice()
+        {
+            return true;
+        }
+
+        @Override
+        public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean hasBound(Bound b)
+        {
+            return slice.hasBound(b);
+        }
+
+        @Override
+        public List<ByteBuffer> bounds(Bound b, QueryOptions options) throws InvalidRequestException
+        {
+            return Collections.singletonList(slice.bound(b).bindAndGet(options));
+        }
+
+        @Override
+        public boolean isInclusive(Bound b)
+        {
+            return slice.isInclusive(b);
+        }
+
+        @Override
+        public Restriction mergeWith(Restriction otherRestriction)
+        throws InvalidRequestException
+        {
+            checkTrue(otherRestriction.isSlice(),
+                      "Column \"%s\" cannot be restricted by both an equality and an inequality relation",
+                      columnDef.name);
+
+            SingleColumnRestriction.Slice otherSlice = (SingleColumnRestriction.Slice) otherRestriction;
+
+            checkFalse(hasBound(Bound.START) && otherSlice.hasBound(Bound.START),
+                       "More than one restriction was found for the start bound on %s", columnDef.name);
+
+            checkFalse(hasBound(Bound.END) && otherSlice.hasBound(Bound.END),
+                       "More than one restriction was found for the end bound on %s", columnDef.name);
+
+            return new Slice(columnDef,  slice.merge(otherSlice.slice));
+        }
+
+        @Override
+        public void addIndexExpressionTo(List<IndexExpression> expressions,
+                                         QueryOptions options) throws InvalidRequestException
+        {
+            for (Bound b : Bound.values())
+            {
+                if (hasBound(b))
+                {
+                    ByteBuffer value = validateIndexedValue(columnDef, slice.bound(b).bindAndGet(options));
+                    Operator op = slice.getIndexOperator(b);
+                    // If the underlying comparator for name is reversed, we need to reverse the IndexOperator: user operation
+                    // always refer to the "forward" sorting even if the clustering order is reversed, but the 2ndary code does
+                    // use the underlying comparator as is.
+                    op = columnDef.isReversedType() ? op.reverse() : op;
+                    expressions.add(new IndexExpression(columnDef.name.bytes, op, value));
+                }
+            }
+        }
+
+        @Override
+        protected boolean isSupportedBy(SecondaryIndex index)
+        {
+            return slice.isSupportedBy(index);
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("SLICE%s", slice);
+        }
+
+        private Slice(ColumnDefinition columnDef, TermSlice slice)
+        {
+            super(columnDef);
+            this.slice = slice;
+        }
+    }
+
+    // This holds both CONTAINS and CONTAINS_KEY restriction because we might want to have both of them.
+    public static final class Contains extends SingleColumnRestriction
+    {
+        private List<Term> values = new ArrayList<>(); // for CONTAINS
+        private List<Term> keys = new ArrayList<>();  // for CONTAINS_KEY
+
+        public Contains(ColumnDefinition columnDef, Term t, boolean isKey)
+        {
+            super(columnDef);
+            if (isKey)
+                keys.add(t);
+            else
+                values.add(t);
+        }
+
+        @Override
+        public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
+        {
+            return bindAndGet(values, options);
+        }
+
+        @Override
+        public boolean isContains()
+        {
+            return true;
+        }
+
+        @Override
+        public Restriction mergeWith(Restriction otherRestriction) throws InvalidRequestException
+        {
+            checkTrue(otherRestriction.isContains(),
+                      "Collection column %s can only be restricted by CONTAINS or CONTAINS KEY",
+                      getColumnDef().name);
+
+            SingleColumnRestriction.Contains newContains = new Contains(getColumnDef());
+
+            copyKeysAndValues(this, newContains);
+            copyKeysAndValues((Contains) otherRestriction, newContains);
+
+            return newContains;
+        }
+
+        @Override
+        public void addIndexExpressionTo(List<IndexExpression> expressions,
+                                         QueryOptions options)
+                                         throws InvalidRequestException
+        {
+            for (ByteBuffer value : values(options))
+            {
+                validateIndexedValue(columnDef, value);
+                expressions.add(new IndexExpression(columnDef.name.bytes, Operator.CONTAINS, value));
+            }
+            for (ByteBuffer key : keys(options))
+            {
+                validateIndexedValue(columnDef, key);
+                expressions.add(new IndexExpression(columnDef.name.bytes, Operator.CONTAINS_KEY, key));
+            }
+        }
+
+        @Override
+        protected boolean isSupportedBy(SecondaryIndex index)
+        {
+            boolean supported = false;
+
+            if (numberOfValues() > 0)
+                supported |= index.supportsOperator(Operator.CONTAINS);
+
+            if (numberOfKeys() > 0)
+                supported |= index.supportsOperator(Operator.CONTAINS_KEY);
+
+            return supported;
+        }
+
+        public int numberOfValues()
+        {
+            return values.size();
+        }
+
+        public int numberOfKeys()
+        {
+            return keys.size();
+        }
+
+        @Override
+        public boolean usesFunction(String ksName, String functionName)
+        {
+            return usesFunction(values, ksName, functionName) || usesFunction(keys, ksName, functionName);
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("CONTAINS(values=%s, keys=%s)", values, keys);
+        }
+
+        @Override
+        public boolean hasBound(Bound b)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public List<ByteBuffer> bounds(Bound b, QueryOptions options) throws InvalidRequestException
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isInclusive(Bound b)
+        {
+            throw new UnsupportedOperationException();
+        }
+        private List<ByteBuffer> keys(QueryOptions options) throws InvalidRequestException
+        {
+            return bindAndGet(keys, options);
+        }
+
+        /**
+         * Binds the query options to the specified terms and returns the resulting values.
+         *
+         * @param terms the terms
+         * @param options the query options
+         * @return the value resulting from binding the query options to the specified terms
+         * @throws InvalidRequestException if a problem occurs while binding the query options
+         */
+        private static List<ByteBuffer> bindAndGet(List<Term> terms, QueryOptions options) throws InvalidRequestException
+        {
+            List<ByteBuffer> buffers = new ArrayList<>(terms.size());
+            for (Term value : terms)
+                buffers.add(value.bindAndGet(options));
+            return buffers;
+        }
+
+        /**
+         * Copies the keys and value from the first <code>Contains</code> to the second one.
+         *
+         * @param from the <code>Contains</code> to copy from
+         * @param to the <code>Contains</code> to copy to
+         */
+        private static void copyKeysAndValues(Contains from, Contains to)
+        {
+            to.values.addAll(from.values);
+            to.keys.addAll(from.keys);
+        }
+
+        private Contains(ColumnDefinition columnDef)
+        {
+            super(columnDef);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestrictions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestrictions.java
new file mode 100644
index 0000000..ec74cc9
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestrictions.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.restrictions;
+
+import java.util.*;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.restrictions.SingleColumnRestriction.Contains;
+import org.apache.cassandra.db.IndexExpression;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+/**
+ * Sets of single column restrictions.
+ */
+final class SingleColumnRestrictions implements Restrictions
+{
+    /**
+     * The comparator used to sort the <code>Restriction</code>s.
+     */
+    private static final Comparator<ColumnDefinition> COLUMN_DEFINITION_COMPARATOR = new Comparator<ColumnDefinition>()
+    {
+        @Override
+        public int compare(ColumnDefinition column, ColumnDefinition otherColumn)
+        {
+            int value = Integer.compare(column.position(), otherColumn.position());
+            return value != 0 ? value : column.name.bytes.compareTo(otherColumn.name.bytes);
+        }
+    };
+
+    /**
+     * The restrictions per column.
+     */
+    protected final TreeMap<ColumnDefinition, Restriction> restrictions;
+
+    public SingleColumnRestrictions()
+    {
+        this(new TreeMap<ColumnDefinition, Restriction>(COLUMN_DEFINITION_COMPARATOR));
+    }
+
+    protected SingleColumnRestrictions(TreeMap<ColumnDefinition, Restriction> restrictions)
+    {
+        this.restrictions = restrictions;
+    }
+
+    @Override
+    public final void addIndexExpressionTo(List<IndexExpression> expressions,
+                                           QueryOptions options) throws InvalidRequestException
+    {
+        for (Restriction restriction : restrictions.values())
+            restriction.addIndexExpressionTo(expressions, options);
+    }
+
+    @Override
+    public final Set<ColumnDefinition> getColumnDefs()
+    {
+        return restrictions.keySet();
+    }
+
+    /**
+     * Returns the restriction associated to the specified column.
+     *
+     * @param columnDef the column definition
+     * @return the restriction associated to the specified column
+     */
+    public Restriction getRestriction(ColumnDefinition columnDef)
+    {
+        return restrictions.get(columnDef);
+    }
+
+    @Override
+    public boolean usesFunction(String ksName, String functionName)
+    {
+        for (Restriction restriction : restrictions.values())
+            if (restriction.usesFunction(ksName, functionName))
+                return true;
+
+        return false;
+    }
+
+    @Override
+    public final boolean isEmpty()
+    {
+        return getColumnDefs().isEmpty();
+    }
+
+    @Override
+    public final int size()
+    {
+        return getColumnDefs().size();
+    }
+
+    /**
+     * Adds the specified restriction to this set of restrictions.
+     *
+     * @param restriction the restriction to add
+     * @return the new set of restrictions
+     * @throws InvalidRequestException if the new restriction cannot be added
+     */
+    public SingleColumnRestrictions addRestriction(SingleColumnRestriction restriction) throws InvalidRequestException
+    {
+        TreeMap<ColumnDefinition, Restriction> newRestrictions = new TreeMap<>(this.restrictions);
+        return new SingleColumnRestrictions(mergeRestrictions(newRestrictions, restriction));
+    }
+
+    private static TreeMap<ColumnDefinition, Restriction> mergeRestrictions(TreeMap<ColumnDefinition, Restriction> restrictions,
+                                                                            Restriction restriction)
+                                                                            throws InvalidRequestException
+    {
+        ColumnDefinition def = ((SingleColumnRestriction) restriction).getColumnDef();
+        Restriction existing = restrictions.get(def);
+        Restriction newRestriction = mergeRestrictions(existing, restriction);
+        restrictions.put(def, newRestriction);
+        return restrictions;
+    }
+
+    @Override
+    public final boolean hasSupportingIndex(SecondaryIndexManager indexManager)
+    {
+        for (Restriction restriction : restrictions.values())
+        {
+            if (restriction.hasSupportingIndex(indexManager))
+                return true;
+        }
+        return false;
+    }
+
+    /**
+     * Returns the column after the specified one.
+     *
+     * @param columnDef the column for which the next one need to be found
+     * @return the column after the specified one.
+     */
+    ColumnDefinition nextColumn(ColumnDefinition columnDef)
+    {
+        return restrictions.tailMap(columnDef, false).firstKey();
+    }
+
+    /**
+     * Returns the definition of the last column.
+     *
+     * @return the definition of the last column.
+     */
+    ColumnDefinition lastColumn()
+    {
+        return isEmpty() ? null : this.restrictions.lastKey();
+    }
+
+    /**
+     * Returns the last restriction.
+     *
+     * @return the last restriction.
+     */
+    Restriction lastRestriction()
+    {
+        return isEmpty() ? null : this.restrictions.lastEntry().getValue();
+    }
+
+    /**
+     * Merges the two specified restrictions.
+     *
+     * @param restriction the first restriction
+     * @param otherRestriction the second restriction
+     * @return the merged restriction
+     * @throws InvalidRequestException if the two restrictions cannot be merged
+     */
+    private static Restriction mergeRestrictions(Restriction restriction,
+                                                 Restriction otherRestriction) throws InvalidRequestException
+    {
+        return restriction == null ? otherRestriction
+                                   : restriction.mergeWith(otherRestriction);
+    }
+
+    /**
+     * Checks if the restrictions contains multiple contains or contains key.
+     *
+     * @return <code>true</code> if the restrictions contains multiple contains or contains key.,
+     * <code>false</code> otherwise
+     */
+    public final boolean hasMultipleContains()
+    {
+        int numberOfContains = 0;
+        for (Restriction restriction : restrictions.values())
+        {
+            if (restriction.isContains())
+            {
+                Contains contains = (Contains) restriction;
+                numberOfContains += (contains.numberOfValues() + contains.numberOfKeys());
+            }
+        }
+        return numberOfContains > 1;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
new file mode 100644
index 0000000..60c7465
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.restrictions;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.base.Joiner;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.Relation;
+import org.apache.cassandra.cql3.VariableSpecifications;
+import org.apache.cassandra.cql3.statements.Bound;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.IndexExpression;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
+import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
+
+/**
+ * The restrictions corresponding to the relations specified on the where-clause of CQL query.
+ */
+public final class StatementRestrictions
+{
+    /**
+     * The Column Family meta data
+     */
+    public final CFMetaData cfm;
+
+    /**
+     * Restrictions on partitioning columns
+     */
+    private PrimaryKeyRestrictions partitionKeyRestrictions;
+
+    /**
+     * Restrictions on clustering columns
+     */
+    private PrimaryKeyRestrictions clusteringColumnsRestrictions;
+
+    /**
+     * Restriction on non-primary key columns (i.e. secondary index restrictions)
+     */
+    private SingleColumnRestrictions nonPrimaryKeyRestrictions;
+
+    /**
+     * The restrictions used to build the index expressions
+     */
+    private final List<Restrictions> indexRestrictions = new ArrayList<>();
+
+    /**
+     * <code>true</code> if the secondary index need to be queried, <code>false</code> otherwise
+     */
+    private boolean usesSecondaryIndexing;
+
+    /**
+     * Specify if the query will return a range of partition keys.
+     */
+    private boolean isKeyRange;
+
+    /**
+     * Creates a new empty <code>StatementRestrictions</code>.
+     *
+     * @param cfm the column family meta data
+     * @return a new empty <code>StatementRestrictions</code>.
+     */
+    public static StatementRestrictions empty(CFMetaData cfm)
+    {
+        return new StatementRestrictions(cfm);
+    }
+
+    private StatementRestrictions(CFMetaData cfm)
+    {
+        this.cfm = cfm;
+        this.partitionKeyRestrictions = new SingleColumnPrimaryKeyRestrictions(cfm.getKeyValidatorAsCType());
+        this.clusteringColumnsRestrictions = new SingleColumnPrimaryKeyRestrictions(cfm.comparator);
+        this.nonPrimaryKeyRestrictions = new SingleColumnRestrictions();
+    }
+
+    public StatementRestrictions(CFMetaData cfm,
+            List<Relation> whereClause,
+            VariableSpecifications boundNames,
+            boolean selectsOnlyStaticColumns,
+            boolean selectACollection) throws InvalidRequestException
+    {
+        this.cfm = cfm;
+        this.partitionKeyRestrictions = new SingleColumnPrimaryKeyRestrictions(cfm.getKeyValidatorAsCType());
+        this.clusteringColumnsRestrictions = new SingleColumnPrimaryKeyRestrictions(cfm.comparator);
+        this.nonPrimaryKeyRestrictions = new SingleColumnRestrictions();
+
+        /*
+         * WHERE clause. For a given entity, rules are: - EQ relation conflicts with anything else (including a 2nd EQ)
+         * - Can't have more than one LT(E) relation (resp. GT(E) relation) - IN relation are restricted to row keys
+         * (for now) and conflicts with anything else (we could allow two IN for the same entity but that doesn't seem
+         * very useful) - The value_alias cannot be restricted in any way (we don't support wide rows with indexed value
+         * in CQL so far)
+         */
+        for (Relation relation : whereClause)
+            addRestriction(relation.toRestriction(cfm, boundNames));
+
+        ColumnFamilyStore cfs = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfName);
+        SecondaryIndexManager secondaryIndexManager = cfs.indexManager;
+
+        boolean hasQueriableClusteringColumnIndex = clusteringColumnsRestrictions.hasSupportingIndex(secondaryIndexManager);
+        boolean hasQueriableIndex = hasQueriableClusteringColumnIndex
+                || partitionKeyRestrictions.hasSupportingIndex(secondaryIndexManager)
+                || nonPrimaryKeyRestrictions.hasSupportingIndex(secondaryIndexManager);
+
+        // At this point, the select statement if fully constructed, but we still have a few things to validate
+        processPartitionKeyRestrictions(hasQueriableIndex);
+
+        // Some but not all of the partition key columns have been specified;
+        // hence we need turn these restrictions into index expressions.
+        if (usesSecondaryIndexing)
+            indexRestrictions.add(partitionKeyRestrictions);
+
+        checkFalse(selectsOnlyStaticColumns && hasClusteringColumnsRestriction(),
+                   "Cannot restrict clustering columns when selecting only static columns");
+
+        processClusteringColumnsRestrictions(hasQueriableIndex, selectACollection);
+
+        // Covers indexes on the first clustering column (among others).
+        if (isKeyRange && hasQueriableClusteringColumnIndex)
+            usesSecondaryIndexing = true;
+
+        if (usesSecondaryIndexing)
+        {
+            indexRestrictions.add(clusteringColumnsRestrictions);
+        }
+        else if (clusteringColumnsRestrictions.isContains())
+        {
+            indexRestrictions.add(new ForwardingPrimaryKeyRestrictions() {
+
+                @Override
+                protected PrimaryKeyRestrictions getDelegate()
+                {
+                    return clusteringColumnsRestrictions;
+                }
+
+                @Override
+                public void addIndexExpressionTo(List<IndexExpression> expressions, QueryOptions options) throws InvalidRequestException
+                {
+                    List<IndexExpression> list = new ArrayList<>();
+                    super.addIndexExpressionTo(list, options);
+
+                    for (IndexExpression expression : list)
+                    {
+                        if (expression.isContains() || expression.isContainsKey())
+                            expressions.add(expression);
+                    }
+                }
+            });
+            usesSecondaryIndexing = true;
+        }
+        // Even if usesSecondaryIndexing is false at this point, we'll still have to use one if
+        // there is restrictions not covered by the PK.
+        if (!nonPrimaryKeyRestrictions.isEmpty())
+        {
+            usesSecondaryIndexing = true;
+            indexRestrictions.add(nonPrimaryKeyRestrictions);
+        }
+
+        if (usesSecondaryIndexing)
+            validateSecondaryIndexSelections(selectsOnlyStaticColumns);
+    }
+
+    private void addRestriction(Restriction restriction) throws InvalidRequestException
+    {
+        if (restriction.isMultiColumn())
+            clusteringColumnsRestrictions = clusteringColumnsRestrictions.mergeWith(restriction);
+        else if (restriction.isOnToken())
+            partitionKeyRestrictions = partitionKeyRestrictions.mergeWith(restriction);
+        else
+            addSingleColumnRestriction((SingleColumnRestriction) restriction);
+    }
+
+    public boolean usesFunction(String ksName, String functionName)
+    {
+        return  partitionKeyRestrictions.usesFunction(ksName, functionName)
+                || clusteringColumnsRestrictions.usesFunction(ksName, functionName)
+                || nonPrimaryKeyRestrictions.usesFunction(ksName, functionName);
+    }
+
+    private void addSingleColumnRestriction(SingleColumnRestriction restriction) throws InvalidRequestException
+    {
+        ColumnDefinition def = restriction.getColumnDef();
+        if (def.isPartitionKey())
+            partitionKeyRestrictions = partitionKeyRestrictions.mergeWith(restriction);
+        else if (def.isClusteringColumn())
+            clusteringColumnsRestrictions = clusteringColumnsRestrictions.mergeWith(restriction);
+        else
+            nonPrimaryKeyRestrictions = nonPrimaryKeyRestrictions.addRestriction(restriction);
+    }
+
+    /**
+     * Checks if the restrictions on the partition key is an IN restriction.
+     *
+     * @return <code>true</code> the restrictions on the partition key is an IN restriction, <code>false</code>
+     * otherwise.
+     */
+    public boolean keyIsInRelation()
+    {
+        return partitionKeyRestrictions.isIN();
+    }
+
+    /**
+     * Checks if the query request a range of partition keys.
+     *
+     * @return <code>true</code> if the query request a range of partition keys, <code>false</code> otherwise.
+     */
+    public boolean isKeyRange()
+    {
+        return this.isKeyRange;
+    }
+
+    /**
+     * Checks if the secondary index need to be queried.
+     *
+     * @return <code>true</code> if the secondary index need to be queried, <code>false</code> otherwise.
+     */
+    public boolean usesSecondaryIndexing()
+    {
+        return this.usesSecondaryIndexing;
+    }
+
+    private void processPartitionKeyRestrictions(boolean hasQueriableIndex) throws InvalidRequestException
+    {
+        // If there is a queriable index, no special condition are required on the other restrictions.
+        // But we still need to know 2 things:
+        // - If we don't have a queriable index, is the query ok
+        // - Is it queriable without 2ndary index, which is always more efficient
+        // If a component of the partition key is restricted by a relation, all preceding
+        // components must have a EQ. Only the last partition key component can be in IN relation.
+        if (partitionKeyRestrictions.isOnToken())
+        {
+            isKeyRange = true;
+        }
+        else if (hasPartitionKeyUnrestrictedComponents())
+        {
+            if (!partitionKeyRestrictions.isEmpty())
+            {
+                if (!hasQueriableIndex)
+                    throw invalidRequest("Partition key parts: %s must be restricted as other parts are",
+                                         Joiner.on(", ").join(getPartitionKeyUnrestrictedComponents()));
+            }
+
+            isKeyRange = true;
+            usesSecondaryIndexing = hasQueriableIndex;
+        }
+    }
+
+    /**
+     * Checks if the partition key has some unrestricted components.
+     * @return <code>true</code> if the partition key has some unrestricted components, <code>false</code> otherwise.
+     */
+    private boolean hasPartitionKeyUnrestrictedComponents()
+    {
+        return partitionKeyRestrictions.size() <  cfm.partitionKeyColumns().size();
+    }
+
+    /**
+     * Returns the partition key components that are not restricted.
+     * @return the partition key components that are not restricted.
+     */
+    private List<ColumnIdentifier> getPartitionKeyUnrestrictedComponents()
+    {
+        List<ColumnDefinition> list = new ArrayList<>(cfm.partitionKeyColumns());
+        list.removeAll(partitionKeyRestrictions.getColumnDefs());
+        return ColumnDefinition.toIdentifiers(list);
+    }
+
+    /**
+     * Processes the clustering column restrictions.
+     *
+     * @param hasQueriableIndex <code>true</code> if some of the queried data are indexed, <code>false</code> otherwise
+     * @param selectACollection <code>true</code> if the query should return a collection column
+     * @throws InvalidRequestException if the request is invalid
+     */
+    private void processClusteringColumnsRestrictions(boolean hasQueriableIndex,
+                                                      boolean selectACollection) throws InvalidRequestException
+    {
+        checkFalse(clusteringColumnsRestrictions.isIN() && selectACollection,
+                   "Cannot restrict clustering columns by IN relations when a collection is selected by the query");
+        checkFalse(clusteringColumnsRestrictions.isContains() && !hasQueriableIndex,
+                   "Cannot restrict clustering columns by a CONTAINS relation without a secondary index");
+
+        if (hasClusteringColumnsRestriction())
+        {
+            List<ColumnDefinition> clusteringColumns = cfm.clusteringColumns();
+            List<ColumnDefinition> restrictedColumns = new LinkedList<>(clusteringColumnsRestrictions.getColumnDefs());
+
+            for (int i = 0, m = restrictedColumns.size(); i < m; i++)
+            {
+                ColumnDefinition clusteringColumn = clusteringColumns.get(i);
+                ColumnDefinition restrictedColumn = restrictedColumns.get(i);
+
+                if (!clusteringColumn.equals(restrictedColumn))
+                {
+                    checkTrue(hasQueriableIndex,
+                              "PRIMARY KEY column \"%s\" cannot be restricted as preceding column \"%s\" is not restricted",
+                              restrictedColumn.name,
+                              clusteringColumn.name);
+
+                    usesSecondaryIndexing = true; // handle gaps and non-keyrange cases.
+                    break;
+                }
+            }
+        }
+
+        if (clusteringColumnsRestrictions.isContains())
+            usesSecondaryIndexing = true;
+    }
+
+    public List<IndexExpression> getIndexExpressions(QueryOptions options) throws InvalidRequestException
+    {
+        if (!usesSecondaryIndexing || indexRestrictions.isEmpty())
+            return Collections.emptyList();
+
+        List<IndexExpression> expressions = new ArrayList<>();
+        for (Restrictions restrictions : indexRestrictions)
+            restrictions.addIndexExpressionTo(expressions, options);
+
+        return expressions;
+    }
+
+    /**
+     * Returns the partition keys for which the data is requested.
+     *
+     * @param options the query options
+     * @return the partition keys for which the data is requested.
+     * @throws InvalidRequestException if the partition keys cannot be retrieved
+     */
+    public Collection<ByteBuffer> getPartitionKeys(final QueryOptions options) throws InvalidRequestException
+    {
+        return partitionKeyRestrictions.values(options);
+    }
+
+    /**
+     * Returns the specified bound of the partition key.
+     *
+     * @param b the boundary type
+     * @param options the query options
+     * @return the specified bound of the partition key
+     * @throws InvalidRequestException if the boundary cannot be retrieved
+     */
+    private ByteBuffer getPartitionKeyBound(Bound b, QueryOptions options) throws InvalidRequestException
+    {
+        // Deal with unrestricted partition key components (special-casing is required to deal with 2i queries on the
+        // first
+        // component of a composite partition key).
+        if (hasPartitionKeyUnrestrictedComponents())
+            return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+
+        // We deal with IN queries for keys in other places, so we know buildBound will return only one result
+        return partitionKeyRestrictions.bounds(b, options).get(0);
+    }
+
+    /**
+     * Returns the partition key bounds.
+     *
+     * @param options the query options
+     * @return the partition key bounds
+     * @throws InvalidRequestException if the query is invalid
+     */
+    public AbstractBounds<RowPosition> getPartitionKeyBounds(QueryOptions options) throws InvalidRequestException
+    {
+        IPartitioner p = StorageService.getPartitioner();
+
+        if (partitionKeyRestrictions.isOnToken())
+        {
+            return getPartitionKeyBoundsForTokenRestrictions(p, options);
+        }
+
+        return getPartitionKeyBounds(p, options);
+    }
+
+    private AbstractBounds<RowPosition> getPartitionKeyBounds(IPartitioner p,
+                                                              QueryOptions options) throws InvalidRequestException
+    {
+        ByteBuffer startKeyBytes = getPartitionKeyBound(Bound.START, options);
+        ByteBuffer finishKeyBytes = getPartitionKeyBound(Bound.END, options);
+
+        RowPosition startKey = RowPosition.ForKey.get(startKeyBytes, p);
+        RowPosition finishKey = RowPosition.ForKey.get(finishKeyBytes, p);
+
+        if (startKey.compareTo(finishKey) > 0 && !finishKey.isMinimum())
+            return null;
+
+        if (partitionKeyRestrictions.isInclusive(Bound.START))
+        {
+            return partitionKeyRestrictions.isInclusive(Bound.END)
+                    ? new Bounds<>(startKey, finishKey)
+                    : new IncludingExcludingBounds<>(startKey, finishKey);
+        }
+
+        return partitionKeyRestrictions.isInclusive(Bound.END)
+                ? new Range<>(startKey, finishKey)
+                : new ExcludingBounds<>(startKey, finishKey);
+    }
+
+    private AbstractBounds<RowPosition> getPartitionKeyBoundsForTokenRestrictions(IPartitioner p,
+                                                                                  QueryOptions options)
+                                                                                          throws InvalidRequestException
+    {
+        Token startToken = getTokenBound(Bound.START, options, p);
+        Token endToken = getTokenBound(Bound.END, options, p);
+
+        boolean includeStart = partitionKeyRestrictions.isInclusive(Bound.START);
+        boolean includeEnd = partitionKeyRestrictions.isInclusive(Bound.END);
+
+        /*
+         * If we ask SP.getRangeSlice() for (token(200), token(200)], it will happily return the whole ring.
+         * However, wrapping range doesn't really make sense for CQL, and we want to return an empty result in that
+         * case (CASSANDRA-5573). So special case to create a range that is guaranteed to be empty.
+         *
+         * In practice, we want to return an empty result set if either startToken > endToken, or both are equal but
+         * one of the bound is excluded (since [a, a] can contains something, but not (a, a], [a, a) or (a, a)).
+         * Note though that in the case where startToken or endToken is the minimum token, then this special case
+         * rule should not apply.
+         */
+        int cmp = startToken.compareTo(endToken);
+        if (!startToken.isMinimum() && !endToken.isMinimum()
+                && (cmp > 0 || (cmp == 0 && (!includeStart || !includeEnd))))
+            return null;
+
+        RowPosition start = includeStart ? startToken.minKeyBound() : startToken.maxKeyBound();
+        RowPosition end = includeEnd ? endToken.maxKeyBound() : endToken.minKeyBound();
+
+        return new Range<>(start, end);
+    }
+
+    private Token getTokenBound(Bound b, QueryOptions options, IPartitioner p) throws InvalidRequestException
+    {
+        if (!partitionKeyRestrictions.hasBound(b))
+            return p.getMinimumToken();
+
+        ByteBuffer value = partitionKeyRestrictions.bounds(b, options).get(0);
+        checkNotNull(value, "Invalid null token value");
+        return p.getTokenFactory().fromByteArray(value);
+    }
+
+    /**
+     * Checks if the query does not contains any restriction on the clustering columns.
+     *
+     * @return <code>true</code> if the query does not contains any restriction on the clustering columns,
+     * <code>false</code> otherwise.
+     */
+    public boolean hasNoClusteringColumnsRestriction()
+    {
+        return clusteringColumnsRestrictions.isEmpty();
+    }
+
+    // For non-composite slices, we don't support internally the difference between exclusive and
+    // inclusive bounds, so we deal with it manually.
+    public boolean isNonCompositeSliceWithExclusiveBounds()
+    {
+        return !cfm.comparator.isCompound()
+                && clusteringColumnsRestrictions.isSlice()
+                && (!clusteringColumnsRestrictions.isInclusive(Bound.START) || !clusteringColumnsRestrictions.isInclusive(Bound.END));
+    }
+
+    /**
+     * Returns the requested clustering columns as <code>Composite</code>s.
+     *
+     * @param options the query options
+     * @return the requested clustering columns as <code>Composite</code>s
+     * @throws InvalidRequestException if the query is not valid
+     */
+    public List<Composite> getClusteringColumnsAsComposites(QueryOptions options) throws InvalidRequestException
+    {
+        return clusteringColumnsRestrictions.valuesAsComposites(options);
+    }
+
+    /**
+     * Returns the bounds (start or end) of the clustering columns as <code>Composites</code>.
+     *
+     * @param b the bound type
+     * @param options the query options
+     * @return the bounds (start or end) of the clustering columns as <code>Composites</code>
+     * @throws InvalidRequestException if the request is not valid
+     */
+    public List<Composite> getClusteringColumnsBoundsAsComposites(Bound b,
+                                                                  QueryOptions options) throws InvalidRequestException
+    {
+        return clusteringColumnsRestrictions.boundsAsComposites(b, options);
+    }
+
+    /**
+     * Returns the bounds (start or end) of the clustering columns.
+     *
+     * @param b the bound type
+     * @param options the query options
+     * @return the bounds (start or end) of the clustering columns
+     * @throws InvalidRequestException if the request is not valid
+     */
+    public List<ByteBuffer> getClusteringColumnsBounds(Bound b, QueryOptions options) throws InvalidRequestException
+    {
+        return clusteringColumnsRestrictions.bounds(b, options);
+    }
+
+    /**
+     * Checks if the bounds (start or end) of the clustering columns are inclusive.
+     *
+     * @param bound the bound type
+     * @return <code>true</code> if the bounds (start or end) of the clustering columns are inclusive,
+     * <code>false</code> otherwise
+     */
+    public boolean areRequestedBoundsInclusive(Bound bound)
+    {
+        return clusteringColumnsRestrictions.isInclusive(bound);
+    }
+
+    /**
+     * Checks if the query returns a range of columns.
+     *
+     * @return <code>true</code> if the query returns a range of columns, <code>false</code> otherwise.
+     */
+    public boolean isColumnRange()
+    {
+        // Due to CASSANDRA-5762, we always do a slice for CQL3 tables (not dense, composite).
+        // Static CF (non dense but non composite) never entails a column slice however
+        if (!cfm.comparator.isDense())
+            return cfm.comparator.isCompound();
+
+        // Otherwise (i.e. for compact table where we don't have a row marker anyway and thus don't care about
+        // CASSANDRA-5762),
+        // it is a range query if it has at least one the column alias for which no relation is defined or is not EQ.
+        return clusteringColumnsRestrictions.size() < cfm.clusteringColumns().size() || clusteringColumnsRestrictions.isSlice();
+    }
+
+    /**
+     * Checks if the query need to use filtering.
+     * @return <code>true</code> if the query need to use filtering, <code>false</code> otherwise.
+     */
+    public boolean needFiltering()
+    {
+        int numberOfRestrictedColumns = 0;
+        for (Restrictions restrictions : indexRestrictions)
+            numberOfRestrictedColumns += restrictions.size();
+
+        return numberOfRestrictedColumns > 1
+                || (numberOfRestrictedColumns == 0 && !clusteringColumnsRestrictions.isEmpty())
+                || (numberOfRestrictedColumns != 0
+                        && nonPrimaryKeyRestrictions.hasMultipleContains());
+    }
+
+    private void validateSecondaryIndexSelections(boolean selectsOnlyStaticColumns) throws InvalidRequestException
+    {
+        checkFalse(keyIsInRelation(),
+                   "Select on indexed columns and with IN clause for the PRIMARY KEY are not supported");
+        // When the user only select static columns, the intent is that we don't query the whole partition but just
+        // the static parts. But 1) we don't have an easy way to do that with 2i and 2) since we don't support index on
+        // static columns
+        // so far, 2i means that you've restricted a non static column, so the query is somewhat non-sensical.
+        checkFalse(selectsOnlyStaticColumns, "Queries using 2ndary indexes don't support selecting only static columns");
+    }
+
+    /**
+     * Checks if the query has some restrictions on the clustering columns.
+     *
+     * @return <code>true</code> if the query has some restrictions on the clustering columns,
+     * <code>false</code> otherwise.
+     */
+    private boolean hasClusteringColumnsRestriction()
+    {
+        return !clusteringColumnsRestrictions.isEmpty();
+    }
+
+    public void reverse()
+    {
+        clusteringColumnsRestrictions = new ReversedPrimaryKeyRestrictions(clusteringColumnsRestrictions);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/cql3/restrictions/TermSlice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/TermSlice.java b/src/java/org/apache/cassandra/cql3/restrictions/TermSlice.java
new file mode 100644
index 0000000..3622220
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/restrictions/TermSlice.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.restrictions;
+
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.statements.Bound;
+import org.apache.cassandra.db.index.SecondaryIndex;
+
+final class TermSlice
+{
+    /**
+     * The slice boundaries.
+     */
+    private final Term[] bounds;
+
+    /**
+     * Specifies if a slice boundary is inclusive or not.
+     */
+    private final boolean[] boundInclusive;
+
+    /**
+     * Creates a new <code>TermSlice</code> with the specified boundaries.
+     *
+     * @param start the lower boundary
+     * @param includeStart <code>true</code> if the lower boundary is inclusive
+     * @param end the upper boundary
+     * @param includeEnd <code>true</code> if the upper boundary is inclusive
+     */
+    private TermSlice(Term start, boolean includeStart, Term end, boolean includeEnd)
+    {
+        bounds = new Term[]{start, end};
+        boundInclusive = new boolean[]{includeStart, includeEnd};
+    }
+
+    /**
+     * Creates a new <code>TermSlice</code> with the specified boundary.
+     *
+     * @param bound the boundary type
+     * @param include <code>true</code> if the boundary is inclusive
+     * @param term the value
+     * @return a new <code>TermSlice</code> instance
+     */
+    public static TermSlice newInstance(Bound bound, boolean include, Term term)
+    {
+        return  bound.isStart() ? new TermSlice(term, include, null, false) 
+                                : new TermSlice(null, false, term, include);
+    }
+
+    /**
+     * Returns the boundary value.
+     *
+     * @param bound the boundary type
+     * @return the boundary value
+     */
+    public Term bound(Bound bound)
+    {
+        return bounds[bound.idx];
+    }
+
+    /**
+     * Checks if this slice has a boundary for the specified type.
+     *
+     * @param b the boundary type
+     * @return <code>true</code> if this slice has a boundary for the specified type, <code>false</code> otherwise.
+     */
+    public boolean hasBound(Bound b)
+    {
+        return bounds[b.idx] != null;
+    }
+
+    /**
+     * Checks if this slice boundary is inclusive for the specified type.
+     *
+     * @param b the boundary type
+     * @return <code>true</code> if this slice boundary is inclusive for the specified type,
+     * <code>false</code> otherwise.
+     */
+    public boolean isInclusive(Bound b)
+    {
+        return bounds[b.idx] == null || boundInclusive[b.idx];
+    }
+
+    /**
+     * Merges this slice with the specified one.
+     *
+     * @param otherSlice the slice to merge to
+     * @return the new slice resulting from the merge
+     */
+    public TermSlice merge(TermSlice otherSlice)
+    {
+        if (hasBound(Bound.START))
+        {
+            assert !otherSlice.hasBound(Bound.START);
+
+            return new TermSlice(bound(Bound.START), 
+                                  isInclusive(Bound.START),
+                                  otherSlice.bound(Bound.END),
+                                  otherSlice.isInclusive(Bound.END));
+        }
+        assert !otherSlice.hasBound(Bound.END);
+
+        return new TermSlice(otherSlice.bound(Bound.START), 
+                              otherSlice.isInclusive(Bound.START),
+                              bound(Bound.END),
+                              isInclusive(Bound.END));
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("(%s %s, %s %s)", boundInclusive[0] ? ">=" : ">",
+                             bounds[0],
+                             boundInclusive[1] ? "<=" : "<",
+                             bounds[1]);
+    }
+
+    /**
+     * Returns the index operator corresponding to the specified boundary.
+     *
+     * @param b the boundary type
+     * @return the index operator corresponding to the specified boundary
+     */
+    public Operator getIndexOperator(Bound b)
+    {
+        if (b.isStart())
+            return boundInclusive[b.idx] ? Operator.GTE : Operator.GT;
+
+        return boundInclusive[b.idx] ? Operator.LTE : Operator.LT;
+    }
+
+    /**
+     * Check if this <code>TermSlice</code> is supported by the specified index.
+     *
+     * @param index the Secondary index
+     * @return <code>true</code> this type of <code>TermSlice</code> is supported by the specified index,
+     * <code>false</code> otherwise.
+     */
+    public boolean isSupportedBy(SecondaryIndex index)
+    {
+        boolean supported = false;
+
+        if (hasBound(Bound.START))
+            supported |= isInclusive(Bound.START) ? index.supportsOperator(Operator.GTE)
+                    : index.supportsOperator(Operator.GT);
+        if (hasBound(Bound.END))
+            supported |= isInclusive(Bound.END) ? index.supportsOperator(Operator.LTE)
+                    : index.supportsOperator(Operator.LT);
+
+        return supported;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
new file mode 100644
index 0000000..85d614e
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.restrictions;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.base.Joiner;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.statements.Bound;
+import org.apache.cassandra.db.IndexExpression;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
+
+/**
+ * <code>Restriction</code> using the token function.
+ */
+public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions
+{
+    /**
+     * The definition of the columns to which apply the token restriction.
+     */
+    protected final List<ColumnDefinition> columnDefs;
+
+    /**
+     * Creates a new <code>TokenRestriction</code> that apply to the specified columns.
+     *
+     * @param columnDefs the definition of the columns to which apply the token restriction
+     */
+    public TokenRestriction(List<ColumnDefinition> columnDefs)
+    {
+        this.columnDefs = columnDefs;
+    }
+
+    @Override
+    public  boolean isOnToken()
+    {
+        return true;
+    }
+
+    @Override
+    public Collection<ColumnDefinition> getColumnDefs()
+    {
+        return columnDefs;
+    }
+
+    @Override
+    public boolean hasSupportingIndex(SecondaryIndexManager secondaryIndexManager)
+    {
+        return false;
+    }
+
+    @Override
+    public void addIndexExpressionTo(List<IndexExpression> expressions, QueryOptions options)
+    {
+        throw new UnsupportedOperationException("Index expression cannot be created for token restriction");
+    }
+
+    @Override
+    public List<Composite> valuesAsComposites(QueryOptions options) throws InvalidRequestException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<Composite> boundsAsComposites(Bound bound, QueryOptions options) throws InvalidRequestException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Returns the column names as a comma separated <code>String</code>.
+     *
+     * @return the column names as a comma separated <code>String</code>.
+     */
+    protected final String getColumnNamesAsString()
+    {
+        return Joiner.on(", ").join(ColumnDefinition.toIdentifiers(columnDefs));
+    }
+
+    public static final class EQ extends TokenRestriction
+    {
+        private final Term value;
+
+        public EQ(List<ColumnDefinition> columnDefs, Term value)
+        {
+            super(columnDefs);
+            this.value = value;
+        }
+
+        @Override
+        public boolean isEQ()
+        {
+            return true;
+        }
+
+        @Override
+        public boolean usesFunction(String ksName, String functionName)
+        {
+            return usesFunction(value, ksName, functionName);
+        }
+
+        @Override
+        public PrimaryKeyRestrictions mergeWith(Restriction restriction) throws InvalidRequestException
+        {
+            throw invalidRequest("%s cannot be restricted by more than one relation if it includes an Equal",
+                                 Joiner.on(", ").join(ColumnDefinition.toIdentifiers(columnDefs)));
+        }
+
+        @Override
+        public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
+        {
+            return Collections.singletonList(value.bindAndGet(options));
+        }
+    }
+
+    public static class Slice extends TokenRestriction
+    {
+        private final TermSlice slice;
+
+        public Slice(List<ColumnDefinition> columnDefs, Bound bound, boolean inclusive, Term term)
+        {
+            super(columnDefs);
+            slice = TermSlice.newInstance(bound, inclusive, term);
+        }
+
+        @Override
+        public boolean isSlice()
+        {
+            return true;
+        }
+
+        @Override
+        public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean hasBound(Bound b)
+        {
+            return slice.hasBound(b);
+        }
+
+        @Override
+        public List<ByteBuffer> bounds(Bound b, QueryOptions options) throws InvalidRequestException
+        {
+            return Collections.singletonList(slice.bound(b).bindAndGet(options));
+        }
+
+        @Override
+        public boolean usesFunction(String ksName, String functionName)
+        {
+            return (slice.hasBound(Bound.START) && usesFunction(slice.bound(Bound.START), ksName, functionName))
+                    || (slice.hasBound(Bound.END) && usesFunction(slice.bound(Bound.END), ksName, functionName));
+        }
+
+        @Override
+        public boolean isInclusive(Bound b)
+        {
+            return slice.isInclusive(b);
+        }
+
+        @Override
+        public PrimaryKeyRestrictions mergeWith(Restriction otherRestriction)
+        throws InvalidRequestException
+        {
+            if (!otherRestriction.isOnToken())
+                throw invalidRequest("Columns \"%s\" cannot be restricted by both a normal relation and a token relation",
+                                     getColumnNamesAsString());
+
+            if (!otherRestriction.isSlice())
+                throw invalidRequest("Columns \"%s\" cannot be restricted by both an equality and an inequality relation",
+                                     getColumnNamesAsString());
+
+            TokenRestriction.Slice otherSlice = (TokenRestriction.Slice) otherRestriction;
+
+            if (hasBound(Bound.START) && otherSlice.hasBound(Bound.START))
+                throw invalidRequest("More than one restriction was found for the start bound on %s",
+                                     getColumnNamesAsString());
+
+            if (hasBound(Bound.END) && otherSlice.hasBound(Bound.END))
+                throw invalidRequest("More than one restriction was found for the end bound on %s",
+                                     getColumnNamesAsString());
+
+            return new Slice(columnDefs,  slice.merge(otherSlice.slice));
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("SLICE%s", slice);
+        }
+
+        private Slice(List<ColumnDefinition> columnDefs, TermSlice slice)
+        {
+            super(columnDefs);
+            this.slice = slice;
+        }
+    }
+}