You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2016/08/16 13:42:24 UTC

[06/11] cassandra git commit: Merge branch cassandra-2.1 into cassandra-2.2

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9583b6b3/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
index 735a2e2,0000000..1f4960b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
@@@ -1,626 -1,0 +1,669 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.restrictions;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +
 +import com.google.common.collect.Iterables;
 +
++import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.*;
 +import org.apache.cassandra.cql3.Term.Terminal;
 +import org.apache.cassandra.cql3.functions.Function;
 +import org.apache.cassandra.cql3.statements.Bound;
 +import org.apache.cassandra.db.IndexExpression;
 +import org.apache.cassandra.db.composites.CompositesBuilder;
 +import org.apache.cassandra.db.index.SecondaryIndex;
 +import org.apache.cassandra.db.index.SecondaryIndexManager;
 +import org.apache.cassandra.db.marshal.CompositeType;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkBindValueSet;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
 +
 +public abstract class SingleColumnRestriction extends AbstractRestriction
 +{
 +    /**
 +     * The definition of the column to which apply the restriction.
 +     */
 +    protected final ColumnDefinition columnDef;
 +
 +    public SingleColumnRestriction(ColumnDefinition columnDef)
 +    {
 +        this.columnDef = columnDef;
 +    }
 +
 +    @Override
 +    public List<ColumnDefinition> getColumnDefs()
 +    {
 +        return Collections.singletonList(columnDef);
 +    }
 +
 +    @Override
 +    public ColumnDefinition getFirstColumn()
 +    {
 +        return columnDef;
 +    }
 +
 +    @Override
 +    public ColumnDefinition getLastColumn()
 +    {
 +        return columnDef;
 +    }
 +
 +    @Override
 +    public boolean hasSupportingIndex(SecondaryIndexManager indexManager)
 +    {
 +        SecondaryIndex index = indexManager.getIndexForColumn(columnDef.name.bytes);
 +        return index != null && isSupportedBy(index);
 +    }
 +
 +    @Override
 +    public final Restriction mergeWith(Restriction otherRestriction) throws InvalidRequestException
 +    {
 +        // We want to allow query like: b > ? AND (b,c) < (?, ?)
 +        if (otherRestriction.isMultiColumn() && canBeConvertedToMultiColumnRestriction())
 +        {
 +            return toMultiColumnRestriction().mergeWith(otherRestriction);
 +        }
 +
 +        return doMergeWith(otherRestriction);
 +    }
 +
 +    protected abstract Restriction doMergeWith(Restriction otherRestriction) throws InvalidRequestException;
 +
 +    /**
 +     * Converts this <code>SingleColumnRestriction</code> into a {@link MultiColumnRestriction}
 +     *
 +     * @return the <code>MultiColumnRestriction</code> corresponding to this
 +     */
 +    abstract MultiColumnRestriction toMultiColumnRestriction();
 +
 +    /**
 +     * Checks if this <code>Restriction</code> can be converted into a {@link MultiColumnRestriction}
 +     *
 +     * @return <code>true</code> if this <code>Restriction</code> can be converted into a
 +     * {@link MultiColumnRestriction}, <code>false</code> otherwise.
 +     */
 +    boolean canBeConvertedToMultiColumnRestriction()
 +    {
 +        return true;
 +    }
 +
 +    /**
 +     * Check if this type of restriction is supported by the specified index.
 +     *
 +     * @param index the Secondary index
 +     * @return <code>true</code> this type of restriction is supported by the specified index,
 +     * <code>false</code> otherwise.
 +     */
 +    protected abstract boolean isSupportedBy(SecondaryIndex index);
 +
 +    public static final class EQ extends SingleColumnRestriction
 +    {
 +        private final Term value;
 +
 +        public EQ(ColumnDefinition columnDef, Term value)
 +        {
 +            super(columnDef);
 +            this.value = value;
 +        }
 +
 +        @Override
 +        public Iterable<Function> getFunctions()
 +        {
 +            return value.getFunctions();
 +        }
 +
 +        @Override
 +        public boolean isEQ()
 +        {
 +            return true;
 +        }
 +
 +        @Override
 +        MultiColumnRestriction toMultiColumnRestriction()
 +        {
 +            return new MultiColumnRestriction.EQ(Collections.singletonList(columnDef), value);
 +        }
 +
 +        @Override
 +        public void addIndexExpressionTo(List<IndexExpression> expressions,
 +                                         SecondaryIndexManager indexManager,
 +                                         QueryOptions options) throws InvalidRequestException
 +        {
 +            ByteBuffer buffer = validateIndexedValue(columnDef, value.bindAndGet(options));
 +            expressions.add(new IndexExpression(columnDef.name.bytes, Operator.EQ, buffer));
 +        }
 +
 +        @Override
-         public CompositesBuilder appendTo(CompositesBuilder builder, QueryOptions options)
++        public CompositesBuilder appendTo(CFMetaData cfm, CompositesBuilder builder, QueryOptions options)
 +        {
 +            builder.addElementToAll(value.bindAndGet(options));
 +            checkFalse(builder.containsNull(), "Invalid null value in condition for column %s", columnDef.name);
 +            checkFalse(builder.containsUnset(), "Invalid unset value for column %s", columnDef.name);
 +            return builder;
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("EQ(%s)", value);
 +        }
 +
 +        @Override
 +        public Restriction doMergeWith(Restriction otherRestriction) throws InvalidRequestException
 +        {
 +            throw invalidRequest("%s cannot be restricted by more than one relation if it includes an Equal", columnDef.name);
 +        }
 +
 +        @Override
 +        protected boolean isSupportedBy(SecondaryIndex index)
 +        {
 +            return index.supportsOperator(Operator.EQ);
 +        }
++
++        @Override
++        public boolean isNotReturningAnyRows(CFMetaData cfm, QueryOptions options)
++        {
++            assert columnDef.isClusteringColumn();
++
++            // Dense non-compound tables do not accept empty ByteBuffers. By consequence, we know that
++            // any query with an EQ restriction containing an empty value will not return any results.
++            return !cfm.comparator.isCompound() && !value.bindAndGet(options).hasRemaining();
++        }
 +    }
 +
 +    public static abstract class IN extends SingleColumnRestriction
 +    {
 +        public IN(ColumnDefinition columnDef)
 +        {
 +            super(columnDef);
 +        }
 +
 +        @Override
 +        public final boolean isIN()
 +        {
 +            return true;
 +        }
 +
 +        @Override
 +        public final Restriction doMergeWith(Restriction otherRestriction) throws InvalidRequestException
 +        {
 +            throw invalidRequest("%s cannot be restricted by more than one relation if it includes a IN", columnDef.name);
 +        }
 +
 +        @Override
-         public CompositesBuilder appendTo(CompositesBuilder builder, QueryOptions options)
++        public CompositesBuilder appendTo(CFMetaData cfm, CompositesBuilder builder, QueryOptions options)
 +        {
-             builder.addEachElementToAll(getValues(options));
++            List<ByteBuffer> values = filterValuesIfNeeded(cfm, getValues(options));
++
++            builder.addEachElementToAll(values);
 +            checkFalse(builder.containsNull(), "Invalid null value in condition for column %s", columnDef.name);
 +            checkFalse(builder.containsUnset(), "Invalid unset value for column %s", columnDef.name);
 +            return builder;
 +        }
 +
++        private List<ByteBuffer> filterValuesIfNeeded(CFMetaData cfm, List<ByteBuffer> values)
++        {
++            if (!columnDef.isClusteringColumn() || cfm.comparator.isCompound())
++                return values;
++
++            // Dense non-compound tables do not accept empty ByteBuffers. By consequence, we know that we can
++            // ignore any IN value which is an empty byte buffer an which otherwise will trigger an error.
++
++            // As some List implementations do not support remove, we copy the list to be on the safe side.
++            List<ByteBuffer> filteredValues = new ArrayList<>(values.size());
++            for (ByteBuffer value : values)
++            {
++                if (value.hasRemaining())
++                    filteredValues.add(value);
++            }
++            return filteredValues;
++        }
++
 +        @Override
 +        public void addIndexExpressionTo(List<IndexExpression> expressions,
 +                                         SecondaryIndexManager indexManager,
 +                                         QueryOptions options) throws InvalidRequestException
 +        {
 +            List<ByteBuffer> values = getValues(options);
 +            checkTrue(values.size() == 1, "IN restrictions are not supported on indexed columns");
 +
 +            ByteBuffer value = validateIndexedValue(columnDef, values.get(0));
 +            expressions.add(new IndexExpression(columnDef.name.bytes, Operator.EQ, value));
 +        }
 +
 +        @Override
 +        protected final boolean isSupportedBy(SecondaryIndex index)
 +        {
 +            return index.supportsOperator(Operator.IN);
 +        }
 +
 +        protected abstract List<ByteBuffer> getValues(QueryOptions options) throws InvalidRequestException;
 +    }
 +
 +    public static class InWithValues extends IN
 +    {
 +        protected final List<Term> values;
 +
 +        public InWithValues(ColumnDefinition columnDef, List<Term> values)
 +        {
 +            super(columnDef);
 +            this.values = values;
 +        }
 +
 +        @Override
 +        MultiColumnRestriction toMultiColumnRestriction()
 +        {
 +            return new MultiColumnRestriction.InWithValues(Collections.singletonList(columnDef), values);
 +        }
 +
 +        @Override
 +        public Iterable<Function> getFunctions()
 +        {
 +            return Terms.getFunctions(values);
 +        }
 +
 +        @Override
 +        protected List<ByteBuffer> getValues(QueryOptions options) throws InvalidRequestException
 +        {
 +            List<ByteBuffer> buffers = new ArrayList<>(values.size());
 +            for (Term value : values)
 +                buffers.add(value.bindAndGet(options));
 +            return buffers;
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("IN(%s)", values);
 +        }
 +    }
 +
 +    public static class InWithMarker extends IN
 +    {
 +        protected final AbstractMarker marker;
 +
 +        public InWithMarker(ColumnDefinition columnDef, AbstractMarker marker)
 +        {
 +            super(columnDef);
 +            this.marker = marker;
 +        }
 +
 +        @Override
 +        public Iterable<Function> getFunctions()
 +        {
 +            return Collections.emptySet();
 +        }
 +
 +        @Override
 +        MultiColumnRestriction toMultiColumnRestriction()
 +        {
 +            return new MultiColumnRestriction.InWithMarker(Collections.singletonList(columnDef), marker);
 +        }
 +
 +        @Override
 +        protected List<ByteBuffer> getValues(QueryOptions options) throws InvalidRequestException
 +        {
 +            Terminal term = marker.bind(options);
 +            checkNotNull(term, "Invalid null value for column %s", columnDef.name);
 +            checkFalse(term == Constants.UNSET_VALUE, "Invalid unset value for column %s", columnDef.name);
 +            Term.MultiItemTerminal lval = (Term.MultiItemTerminal) term;
 +            return lval.getElements();
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return "IN ?";
 +        }
 +    }
 +
 +    public static final class Slice extends SingleColumnRestriction
 +    {
 +        private final TermSlice slice;
 +
 +        public Slice(ColumnDefinition columnDef, Bound bound, boolean inclusive, Term term)
 +        {
 +            super(columnDef);
 +            slice = TermSlice.newInstance(bound, inclusive, term);
 +        }
 +
 +        @Override
 +        public Iterable<Function> getFunctions()
 +        {
 +            return slice.getFunctions();
 +        }
 +
 +        @Override
 +        MultiColumnRestriction toMultiColumnRestriction()
 +        {
 +            return new MultiColumnRestriction.Slice(Collections.singletonList(columnDef), slice);
 +        }
 +
 +        @Override
 +        public boolean isSlice()
 +        {
 +            return true;
 +        }
 +
 +        @Override
-         public CompositesBuilder appendTo(CompositesBuilder builder, QueryOptions options)
++        public CompositesBuilder appendTo(CFMetaData cfm, CompositesBuilder builder, QueryOptions options)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
 +        public boolean hasBound(Bound b)
 +        {
 +            return slice.hasBound(b);
 +        }
 +
 +        @Override
-         public CompositesBuilder appendBoundTo(CompositesBuilder builder, Bound bound, QueryOptions options)
++        public CompositesBuilder appendBoundTo(CFMetaData cfm, CompositesBuilder builder, Bound bound, QueryOptions options)
 +        {
 +            Bound b = reverseBoundIfNeeded(getFirstColumn(), bound);
 +
 +            if (!hasBound(b))
 +                return builder;
 +
 +            ByteBuffer value = slice.bound(b).bindAndGet(options);
 +            checkBindValueSet(value, "Invalid unset value for column %s", columnDef.name);
 +            return builder.addElementToAll(value);
 +
 +        }
 +
 +        @Override
 +        public boolean isInclusive(Bound b)
 +        {
 +            return slice.isInclusive(b);
 +        }
 +
 +        @Override
 +        public Restriction doMergeWith(Restriction otherRestriction) throws InvalidRequestException
 +        {
 +            checkTrue(otherRestriction.isSlice(),
 +                      "Column \"%s\" cannot be restricted by both an equality and an inequality relation",
 +                      columnDef.name);
 +
 +            SingleColumnRestriction.Slice otherSlice = (SingleColumnRestriction.Slice) otherRestriction;
 +
 +            checkFalse(hasBound(Bound.START) && otherSlice.hasBound(Bound.START),
 +                       "More than one restriction was found for the start bound on %s", columnDef.name);
 +
 +            checkFalse(hasBound(Bound.END) && otherSlice.hasBound(Bound.END),
 +                       "More than one restriction was found for the end bound on %s", columnDef.name);
 +
 +            return new Slice(columnDef,  slice.merge(otherSlice.slice));
 +        }
 +
 +        @Override
 +        public void addIndexExpressionTo(List<IndexExpression> expressions,
 +                                         SecondaryIndexManager indexManager,
 +                                         QueryOptions options) throws InvalidRequestException
 +        {
 +            for (Bound b : Bound.values())
 +            {
 +                if (hasBound(b))
 +                {
 +                    ByteBuffer value = validateIndexedValue(columnDef, slice.bound(b).bindAndGet(options));
 +                    Operator op = slice.getIndexOperator(b);
 +                    // If the underlying comparator for name is reversed, we need to reverse the IndexOperator: user operation
 +                    // always refer to the "forward" sorting even if the clustering order is reversed, but the 2ndary code does
 +                    // use the underlying comparator as is.
 +                    op = columnDef.isReversedType() ? op.reverse() : op;
 +                    expressions.add(new IndexExpression(columnDef.name.bytes, op, value));
 +                }
 +            }
 +        }
 +
 +        @Override
 +        protected boolean isSupportedBy(SecondaryIndex index)
 +        {
 +            return slice.isSupportedBy(index);
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("SLICE%s", slice);
 +        }
 +
++        @Override
++        public boolean isNotReturningAnyRows(CFMetaData cfm, QueryOptions options)
++        {
++            assert columnDef.isClusteringColumn();
++
++            // Dense non-compound tables do not accept empty ByteBuffers. By consequence, we know that
++            // any query with a slice restriction with an empty value for the END bound will not return any results.
++            return !cfm.comparator.isCompound()
++                    && hasBound(Bound.END)
++                    && !slice.bound(Bound.END).bindAndGet(options).hasRemaining();
++        }
++
 +        private Slice(ColumnDefinition columnDef, TermSlice slice)
 +        {
 +            super(columnDef);
 +            this.slice = slice;
 +        }
 +    }
 +
 +    // This holds CONTAINS, CONTAINS_KEY, and map[key] = value restrictions because we might want to have any combination of them.
 +    public static final class Contains extends SingleColumnRestriction
 +    {
 +        private List<Term> values = new ArrayList<>(); // for CONTAINS
 +        private List<Term> keys = new ArrayList<>(); // for CONTAINS_KEY
 +        private List<Term> entryKeys = new ArrayList<>(); // for map[key] = value
 +        private List<Term> entryValues = new ArrayList<>(); // for map[key] = value
 +
 +        public Contains(ColumnDefinition columnDef, Term t, boolean isKey)
 +        {
 +            super(columnDef);
 +            if (isKey)
 +                keys.add(t);
 +            else
 +                values.add(t);
 +        }
 +
 +        public Contains(ColumnDefinition columnDef, Term mapKey, Term mapValue)
 +        {
 +            super(columnDef);
 +            entryKeys.add(mapKey);
 +            entryValues.add(mapValue);
 +        }
 +
 +        @Override
 +        MultiColumnRestriction toMultiColumnRestriction()
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
 +        boolean canBeConvertedToMultiColumnRestriction()
 +        {
 +            return false;
 +        }
 +
 +        @Override
-         public CompositesBuilder appendTo(CompositesBuilder builder, QueryOptions options)
++        public CompositesBuilder appendTo(CFMetaData cfm, CompositesBuilder builder, QueryOptions options)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
 +        public boolean isContains()
 +        {
 +            return true;
 +        }
 +
 +        @Override
 +        public Restriction doMergeWith(Restriction otherRestriction) throws InvalidRequestException
 +        {
 +            checkTrue(otherRestriction.isContains(),
 +                      "Collection column %s can only be restricted by CONTAINS, CONTAINS KEY, or map-entry equality",
 +                      columnDef.name);
 +
 +            SingleColumnRestriction.Contains newContains = new Contains(columnDef);
 +
 +            copyKeysAndValues(this, newContains);
 +            copyKeysAndValues((Contains) otherRestriction, newContains);
 +
 +            return newContains;
 +        }
 +
 +        @Override
 +        public void addIndexExpressionTo(List<IndexExpression> expressions,
 +                                         SecondaryIndexManager indexManager,
 +                                         QueryOptions options)
 +                                         throws InvalidRequestException
 +        {
 +            addExpressionsFor(expressions, bindAndGet(values, options), Operator.CONTAINS);
 +            addExpressionsFor(expressions, bindAndGet(keys, options), Operator.CONTAINS_KEY);
 +            addExpressionsFor(expressions, entries(options), Operator.EQ);
 +        }
 +
 +        private void addExpressionsFor(List<IndexExpression> target, List<ByteBuffer> values,
 +                                       Operator op) throws InvalidRequestException
 +        {
 +            for (ByteBuffer value : values)
 +            {
 +                validateIndexedValue(columnDef, value);
 +                target.add(new IndexExpression(columnDef.name.bytes, op, value));
 +            }
 +        }
 +
 +        @Override
 +        protected boolean isSupportedBy(SecondaryIndex index)
 +        {
 +            boolean supported = false;
 +
 +            if (numberOfValues() > 0)
 +                supported |= index.supportsOperator(Operator.CONTAINS);
 +
 +            if (numberOfKeys() > 0)
 +                supported |= index.supportsOperator(Operator.CONTAINS_KEY);
 +
 +            if (numberOfEntries() > 0)
 +                supported |= index.supportsOperator(Operator.EQ);
 +
 +            return supported;
 +        }
 +
 +        public int numberOfValues()
 +        {
 +            return values.size();
 +        }
 +
 +        public int numberOfKeys()
 +        {
 +            return keys.size();
 +        }
 +
 +        public int numberOfEntries()
 +        {
 +            return entryKeys.size();
 +        }
 +
 +        @Override
 +        public Iterable<Function> getFunctions()
 +        {
 +            return Iterables.concat(Terms.getFunctions(values),
 +                                    Terms.getFunctions(keys),
 +                                    Terms.getFunctions(entryKeys),
 +                                    Terms.getFunctions(entryValues));
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("CONTAINS(values=%s, keys=%s, entryKeys=%s, entryValues=%s)", values, keys, entryKeys, entryValues);
 +        }
 +
 +        @Override
 +        public boolean hasBound(Bound b)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
-         public CompositesBuilder appendBoundTo(CompositesBuilder builder, Bound bound, QueryOptions options)
++        public CompositesBuilder appendBoundTo(CFMetaData cfm, CompositesBuilder builder, Bound bound, QueryOptions options)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
 +        public boolean isInclusive(Bound b)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        private List<ByteBuffer> entries(QueryOptions options) throws InvalidRequestException
 +        {
 +            List<ByteBuffer> entryBuffers = new ArrayList<>(entryKeys.size());
 +            List<ByteBuffer> keyBuffers = bindAndGet(entryKeys, options);
 +            List<ByteBuffer> valueBuffers = bindAndGet(entryValues, options);
 +            for (int i = 0; i < entryKeys.size(); i++)
 +            {
 +                if (valueBuffers.get(i) == null)
 +                    throw new InvalidRequestException("Unsupported null value for map-entry equality");
 +                entryBuffers.add(CompositeType.build(keyBuffers.get(i), valueBuffers.get(i)));
 +            }
 +            return entryBuffers;
 +        }
 +
 +        /**
 +         * Binds the query options to the specified terms and returns the resulting values.
 +         *
 +         * @param terms the terms
 +         * @param options the query options
 +         * @return the value resulting from binding the query options to the specified terms
 +         * @throws InvalidRequestException if a problem occurs while binding the query options
 +         */
 +        private static List<ByteBuffer> bindAndGet(List<Term> terms, QueryOptions options) throws InvalidRequestException
 +        {
 +            List<ByteBuffer> buffers = new ArrayList<>(terms.size());
 +            for (Term value : terms)
 +                buffers.add(value.bindAndGet(options));
 +            return buffers;
 +        }
 +
 +        /**
 +         * Copies the keys and value from the first <code>Contains</code> to the second one.
 +         *
 +         * @param from the <code>Contains</code> to copy from
 +         * @param to the <code>Contains</code> to copy to
 +         */
 +        private static void copyKeysAndValues(Contains from, Contains to)
 +        {
 +            to.values.addAll(from.values);
 +            to.keys.addAll(from.keys);
 +            to.entryKeys.addAll(from.entryKeys);
 +            to.entryValues.addAll(from.entryValues);
 +        }
 +
 +        private Contains(ColumnDefinition columnDef)
 +        {
 +            super(columnDef);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9583b6b3/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
index 1547210,0000000..8035877
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
@@@ -1,645 -1,0 +1,656 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.restrictions;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +
 +import com.google.common.base.Joiner;
 +import com.google.common.collect.Iterables;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.*;
 +import org.apache.cassandra.cql3.functions.Function;
 +import org.apache.cassandra.cql3.statements.Bound;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.composites.Composite;
 +import org.apache.cassandra.db.index.SecondaryIndexManager;
 +import org.apache.cassandra.dht.*;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +
 +import static org.apache.cassandra.config.ColumnDefinition.toIdentifiers;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
 +
 +/**
 + * The restrictions corresponding to the relations specified on the where-clause of CQL query.
 + */
 +public final class StatementRestrictions
 +{
 +    public static final String REQUIRES_ALLOW_FILTERING_MESSAGE =
 +            "Cannot execute this query as it might involve data filtering and " +
 +            "thus may have unpredictable performance. If you want to execute " +
 +            "this query despite the performance unpredictability, use ALLOW FILTERING";
 +
 +    /**
 +     * The Column Family meta data
 +     */
 +    public final CFMetaData cfm;
 +
 +    /**
 +     * Restrictions on partitioning columns
 +     */
 +    private PrimaryKeyRestrictions partitionKeyRestrictions;
 +
 +    /**
 +     * Restrictions on clustering columns
 +     */
 +    private PrimaryKeyRestrictions clusteringColumnsRestrictions;
 +
 +    /**
 +     * Restriction on non-primary key columns (i.e. secondary index restrictions)
 +     */
 +    private RestrictionSet nonPrimaryKeyRestrictions;
 +
 +    /**
 +     * The restrictions used to build the index expressions
 +     */
 +    private final List<Restrictions> indexRestrictions = new ArrayList<>();
 +
 +    /**
 +     * <code>true</code> if the secondary index need to be queried, <code>false</code> otherwise
 +     */
 +    private boolean usesSecondaryIndexing;
 +
 +    /**
 +     * Specify if the query will return a range of partition keys.
 +     */
 +    private boolean isKeyRange;
 +
 +    /**
 +     * Creates a new empty <code>StatementRestrictions</code>.
 +     *
 +     * @param cfm the column family meta data
 +     * @return a new empty <code>StatementRestrictions</code>.
 +     */
 +    public static StatementRestrictions empty(CFMetaData cfm)
 +    {
 +        return new StatementRestrictions(cfm);
 +    }
 +
 +    private StatementRestrictions(CFMetaData cfm)
 +    {
 +        this.cfm = cfm;
 +        this.partitionKeyRestrictions = new PrimaryKeyRestrictionSet(cfm.getKeyValidatorAsCType());
 +        this.clusteringColumnsRestrictions = new PrimaryKeyRestrictionSet(cfm.comparator);
 +        this.nonPrimaryKeyRestrictions = new RestrictionSet();
 +    }
 +
 +    public StatementRestrictions(CFMetaData cfm,
 +                                 List<Relation> whereClause,
 +                                 VariableSpecifications boundNames,
 +                                 boolean selectsOnlyStaticColumns,
 +                                 boolean selectACollection,
 +                                 boolean allowFiltering)
 +    {
 +        this.cfm = cfm;
 +        this.partitionKeyRestrictions = new PrimaryKeyRestrictionSet(cfm.getKeyValidatorAsCType());
 +        this.clusteringColumnsRestrictions = new PrimaryKeyRestrictionSet(cfm.comparator);
 +        this.nonPrimaryKeyRestrictions = new RestrictionSet();
 +
 +        /*
 +         * WHERE clause. For a given entity, rules are: - EQ relation conflicts with anything else (including a 2nd EQ)
 +         * - Can't have more than one LT(E) relation (resp. GT(E) relation) - IN relation are restricted to row keys
 +         * (for now) and conflicts with anything else (we could allow two IN for the same entity but that doesn't seem
 +         * very useful) - The value_alias cannot be restricted in any way (we don't support wide rows with indexed value
 +         * in CQL so far)
 +         */
 +        for (Relation relation : whereClause)
 +            addRestriction(relation.toRestriction(cfm, boundNames));
 +
 +        SecondaryIndexManager secondaryIndexManager = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfName).indexManager;
 +        boolean hasQueriableClusteringColumnIndex = clusteringColumnsRestrictions.hasSupportingIndex(secondaryIndexManager);
 +        boolean hasQueriableIndex = hasQueriableClusteringColumnIndex
 +                || partitionKeyRestrictions.hasSupportingIndex(secondaryIndexManager)
 +                || nonPrimaryKeyRestrictions.hasSupportingIndex(secondaryIndexManager);
 +
 +        // At this point, the select statement if fully constructed, but we still have a few things to validate
 +        processPartitionKeyRestrictions(hasQueriableIndex);
 +
 +        // Some but not all of the partition key columns have been specified;
 +        // hence we need turn these restrictions into index expressions.
 +        if (usesSecondaryIndexing)
 +            indexRestrictions.add(partitionKeyRestrictions);
 +
 +        checkFalse(selectsOnlyStaticColumns && hasClusteringColumnsRestriction(),
 +                   "Cannot restrict clustering columns when selecting only static columns");
 +
 +        processClusteringColumnsRestrictions(hasQueriableIndex, selectACollection);
 +
 +        // Covers indexes on the first clustering column (among others).
 +        if (isKeyRange && hasQueriableClusteringColumnIndex)
 +            usesSecondaryIndexing = true;
 +
 +        usesSecondaryIndexing = usesSecondaryIndexing || clusteringColumnsRestrictions.isContains();
 +
 +        if (usesSecondaryIndexing)
 +            indexRestrictions.add(clusteringColumnsRestrictions);
 +
 +        // Even if usesSecondaryIndexing is false at this point, we'll still have to use one if
 +        // there is restrictions not covered by the PK.
 +        if (!nonPrimaryKeyRestrictions.isEmpty())
 +        {
 +            if (!hasQueriableIndex)
 +            {
 +                // Filtering for non-index query is only supported for thrift static CFs
 +                if (cfm.comparator.isDense() ||  cfm.comparator.isCompound())
 +                    throw invalidRequest("Predicates on non-primary-key columns (%s) are not yet supported for non secondary index queries",
 +                                         Joiner.on(", ").join(toIdentifiers(nonPrimaryKeyRestrictions.getColumnDefs())));
 +
 +                if (!allowFiltering)
 +                    throw invalidRequest(REQUIRES_ALLOW_FILTERING_MESSAGE);
 +            }
 +            usesSecondaryIndexing = true;
 +            indexRestrictions.add(nonPrimaryKeyRestrictions);
 +        }
 +
 +        if (usesSecondaryIndexing)
 +            validateSecondaryIndexSelections(selectsOnlyStaticColumns);
 +    }
 +
 +    private void addRestriction(Restriction restriction) throws InvalidRequestException
 +    {
 +        if (restriction.isMultiColumn())
 +            clusteringColumnsRestrictions = clusteringColumnsRestrictions.mergeWith(restriction);
 +        else if (restriction.isOnToken())
 +            partitionKeyRestrictions = partitionKeyRestrictions.mergeWith(restriction);
 +        else
 +            addSingleColumnRestriction((SingleColumnRestriction) restriction);
 +    }
 +
 +    public Iterable<Function> getFunctions()
 +    {
 +        return Iterables.concat(partitionKeyRestrictions.getFunctions(),
 +                                clusteringColumnsRestrictions.getFunctions(),
 +                                nonPrimaryKeyRestrictions.getFunctions());
 +    }
 +
 +    private void addSingleColumnRestriction(SingleColumnRestriction restriction) throws InvalidRequestException
 +    {
 +        ColumnDefinition def = restriction.columnDef;
 +        if (def.isPartitionKey())
 +            partitionKeyRestrictions = partitionKeyRestrictions.mergeWith(restriction);
 +        else if (def.isClusteringColumn())
 +            clusteringColumnsRestrictions = clusteringColumnsRestrictions.mergeWith(restriction);
 +        else
 +            nonPrimaryKeyRestrictions = nonPrimaryKeyRestrictions.addRestriction(restriction);
 +    }
 +
 +    /**
 +     * Checks if the restrictions on the partition key is an IN restriction.
 +     *
 +     * @return <code>true</code> the restrictions on the partition key is an IN restriction, <code>false</code>
 +     * otherwise.
 +     */
 +    public boolean keyIsInRelation()
 +    {
 +        return partitionKeyRestrictions.isIN();
 +    }
 +
 +    /**
 +     * Checks if the query request a range of partition keys.
 +     *
 +     * @return <code>true</code> if the query request a range of partition keys, <code>false</code> otherwise.
 +     */
 +    public boolean isKeyRange()
 +    {
 +        return this.isKeyRange;
 +    }
 +
 +    /**
 +     * Checks if the secondary index need to be queried.
 +     *
 +     * @return <code>true</code> if the secondary index need to be queried, <code>false</code> otherwise.
 +     */
 +    public boolean usesSecondaryIndexing()
 +    {
 +        return this.usesSecondaryIndexing;
 +    }
 +
 +    private void processPartitionKeyRestrictions(boolean hasQueriableIndex) throws InvalidRequestException
 +    {
 +        // If there is a queriable index, no special condition are required on the other restrictions.
 +        // But we still need to know 2 things:
 +        // - If we don't have a queriable index, is the query ok
 +        // - Is it queriable without 2ndary index, which is always more efficient
 +        // If a component of the partition key is restricted by a relation, all preceding
 +        // components must have a EQ. Only the last partition key component can be in IN relation.
 +        if (partitionKeyRestrictions.isOnToken())
 +            isKeyRange = true;
 +
 +        if (hasPartitionKeyUnrestrictedComponents())
 +        {
 +            if (!partitionKeyRestrictions.isEmpty())
 +            {
 +                if (!hasQueriableIndex)
 +                    throw invalidRequest("Partition key parts: %s must be restricted as other parts are",
 +                                         Joiner.on(", ").join(getPartitionKeyUnrestrictedComponents()));
 +            }
 +
 +            isKeyRange = true;
 +            usesSecondaryIndexing = hasQueriableIndex;
 +        }
 +    }
 +
 +    /**
 +     * Checks if the partition key has some unrestricted components.
 +     * @return <code>true</code> if the partition key has some unrestricted components, <code>false</code> otherwise.
 +     */
 +    private boolean hasPartitionKeyUnrestrictedComponents()
 +    {
 +        return partitionKeyRestrictions.size() <  cfm.partitionKeyColumns().size();
 +    }
 +
 +    public boolean hasPartitionKeyRestrictions()
 +    {
 +        return !partitionKeyRestrictions.isEmpty();
 +    }
 +
 +    /**
 +     * Checks if the restrictions contain any non-primary key restrictions
 +     * @return <code>true</code> if the restrictions contain any non-primary key restrictions, <code>false</code> otherwise.
 +     */
 +    public boolean hasNonPrimaryKeyRestrictions()
 +    {
 +        return !nonPrimaryKeyRestrictions.isEmpty();
 +    }
 +
 +    /**
 +     * Returns the partition key components that are not restricted.
 +     * @return the partition key components that are not restricted.
 +     */
 +    private List<ColumnIdentifier> getPartitionKeyUnrestrictedComponents()
 +    {
 +        List<ColumnDefinition> list = new ArrayList<>(cfm.partitionKeyColumns());
 +        list.removeAll(partitionKeyRestrictions.getColumnDefs());
 +        return ColumnDefinition.toIdentifiers(list);
 +    }
 +
 +    /**
 +     * Processes the clustering column restrictions.
 +     *
 +     * @param hasQueriableIndex <code>true</code> if some of the queried data are indexed, <code>false</code> otherwise
 +     * @param selectACollection <code>true</code> if the query should return a collection column
 +     * @throws InvalidRequestException if the request is invalid
 +     */
 +    private void processClusteringColumnsRestrictions(boolean hasQueriableIndex,
 +                                                      boolean selectACollection) throws InvalidRequestException
 +    {
 +        validateClusteringRestrictions(hasQueriableIndex);
 +
 +        checkFalse(clusteringColumnsRestrictions.isIN() && selectACollection,
 +                   "Cannot restrict clustering columns by IN relations when a collection is selected by the query");
 +        checkFalse(clusteringColumnsRestrictions.isContains() && !hasQueriableIndex,
 +                   "Cannot restrict clustering columns by a CONTAINS relation without a secondary index");
 +
 +        if (hasClusteringColumnsRestriction() && clusteringRestrictionsNeedFiltering())
 +        {
 +            if (hasQueriableIndex)
 +            {
 +                usesSecondaryIndexing = true;
 +                return;
 +            }
 +
 +            List<ColumnDefinition> clusteringColumns = cfm.clusteringColumns();
 +            List<ColumnDefinition> restrictedColumns = new LinkedList<>(clusteringColumnsRestrictions.getColumnDefs());
 +
 +            for (int i = 0, m = restrictedColumns.size(); i < m; i++)
 +            {
 +                ColumnDefinition clusteringColumn = clusteringColumns.get(i);
 +                ColumnDefinition restrictedColumn = restrictedColumns.get(i);
 +
 +                if (!clusteringColumn.equals(restrictedColumn))
 +                {
 +                    throw invalidRequest(
 +                              "PRIMARY KEY column \"%s\" cannot be restricted as preceding column \"%s\" is not restricted",
 +                              restrictedColumn.name,
 +                              clusteringColumn.name);
 +                }
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Validates whether or not restrictions are allowed for execution when secondary index is not used.
 +     */
 +    public final void validateClusteringRestrictions(boolean hasQueriableIndex)
 +    {
 +        assert clusteringColumnsRestrictions instanceof PrimaryKeyRestrictionSet;
 +
 +        // If there's a queriable index, filtering will take care of clustering restrictions
 +        if (hasQueriableIndex)
 +            return;
 +
 +        Iterator<Restriction> iter = ((PrimaryKeyRestrictionSet)clusteringColumnsRestrictions).iterator();
 +        Restriction previousRestriction = null;
 +
 +        while (iter.hasNext())
 +        {
 +            Restriction restriction = iter.next();
 +
 +            if (previousRestriction != null)
 +            {
 +                ColumnDefinition lastRestrictionStart = previousRestriction.getFirstColumn();
 +                ColumnDefinition newRestrictionStart = restriction.getFirstColumn();
 +
 +                if (previousRestriction.isSlice() && newRestrictionStart.position() > lastRestrictionStart.position())
 +                    throw invalidRequest("Clustering column \"%s\" cannot be restricted (preceding column \"%s\" is restricted by a non-EQ relation)",
 +                                         newRestrictionStart.name,
 +                                         lastRestrictionStart.name);
 +            }
 +            previousRestriction = restriction;
 +        }
 +    }
 +
 +    public final boolean clusteringRestrictionsNeedFiltering()
 +    {
 +        assert clusteringColumnsRestrictions instanceof PrimaryKeyRestrictionSet;
 +        return ((PrimaryKeyRestrictionSet) clusteringColumnsRestrictions).needsFiltering();
 +    }
 +
 +    public List<IndexExpression> getIndexExpressions(SecondaryIndexManager indexManager,
 +                                                     QueryOptions options) throws InvalidRequestException
 +    {
 +        if (!usesSecondaryIndexing || indexRestrictions.isEmpty())
 +            return Collections.emptyList();
 +
 +        List<IndexExpression> expressions = new ArrayList<>();
 +        for (Restrictions restrictions : indexRestrictions)
 +            restrictions.addIndexExpressionTo(expressions, indexManager, options);
 +
 +        return expressions;
 +    }
 +
 +    /**
 +     * Returns the partition keys for which the data is requested.
 +     *
 +     * @param options the query options
 +     * @return the partition keys for which the data is requested.
 +     * @throws InvalidRequestException if the partition keys cannot be retrieved
 +     */
 +    public Collection<ByteBuffer> getPartitionKeys(final QueryOptions options) throws InvalidRequestException
 +    {
-         return partitionKeyRestrictions.values(options);
++        return partitionKeyRestrictions.values(cfm, options);
 +    }
 +
 +    /**
 +     * Returns the specified bound of the partition key.
 +     *
 +     * @param b the boundary type
 +     * @param options the query options
 +     * @return the specified bound of the partition key
 +     * @throws InvalidRequestException if the boundary cannot be retrieved
 +     */
 +    private ByteBuffer getPartitionKeyBound(Bound b, QueryOptions options) throws InvalidRequestException
 +    {
 +        // Deal with unrestricted partition key components (special-casing is required to deal with 2i queries on the
 +        // first
 +        // component of a composite partition key).
 +        if (hasPartitionKeyUnrestrictedComponents())
 +            return ByteBufferUtil.EMPTY_BYTE_BUFFER;
 +
 +        // We deal with IN queries for keys in other places, so we know buildBound will return only one result
-         return partitionKeyRestrictions.bounds(b, options).get(0);
++        return partitionKeyRestrictions.bounds(cfm, b, options).get(0);
 +    }
 +
 +    /**
 +     * Returns the partition key bounds.
 +     *
 +     * @param options the query options
 +     * @return the partition key bounds
 +     * @throws InvalidRequestException if the query is invalid
 +     */
 +    public AbstractBounds<RowPosition> getPartitionKeyBounds(QueryOptions options) throws InvalidRequestException
 +    {
 +        IPartitioner p = StorageService.getPartitioner();
 +
 +        if (partitionKeyRestrictions.isOnToken())
 +        {
 +            return getPartitionKeyBoundsForTokenRestrictions(p, options);
 +        }
 +
 +        return getPartitionKeyBounds(p, options);
 +    }
 +
 +    private AbstractBounds<RowPosition> getPartitionKeyBounds(IPartitioner p,
 +                                                              QueryOptions options) throws InvalidRequestException
 +    {
 +        ByteBuffer startKeyBytes = getPartitionKeyBound(Bound.START, options);
 +        ByteBuffer finishKeyBytes = getPartitionKeyBound(Bound.END, options);
 +
 +        RowPosition startKey = RowPosition.ForKey.get(startKeyBytes, p);
 +        RowPosition finishKey = RowPosition.ForKey.get(finishKeyBytes, p);
 +
 +        if (startKey.compareTo(finishKey) > 0 && !finishKey.isMinimum())
 +            return null;
 +
 +        if (partitionKeyRestrictions.isInclusive(Bound.START))
 +        {
 +            return partitionKeyRestrictions.isInclusive(Bound.END)
 +                    ? new Bounds<>(startKey, finishKey)
 +                    : new IncludingExcludingBounds<>(startKey, finishKey);
 +        }
 +
 +        return partitionKeyRestrictions.isInclusive(Bound.END)
 +                ? new Range<>(startKey, finishKey)
 +                : new ExcludingBounds<>(startKey, finishKey);
 +    }
 +
 +    private AbstractBounds<RowPosition> getPartitionKeyBoundsForTokenRestrictions(IPartitioner p,
 +                                                                                  QueryOptions options)
 +                                                                                          throws InvalidRequestException
 +    {
 +        Token startToken = getTokenBound(Bound.START, options, p);
 +        Token endToken = getTokenBound(Bound.END, options, p);
 +
 +        boolean includeStart = partitionKeyRestrictions.isInclusive(Bound.START);
 +        boolean includeEnd = partitionKeyRestrictions.isInclusive(Bound.END);
 +
 +        /*
 +         * If we ask SP.getRangeSlice() for (token(200), token(200)], it will happily return the whole ring.
 +         * However, wrapping range doesn't really make sense for CQL, and we want to return an empty result in that
 +         * case (CASSANDRA-5573). So special case to create a range that is guaranteed to be empty.
 +         *
 +         * In practice, we want to return an empty result set if either startToken > endToken, or both are equal but
 +         * one of the bound is excluded (since [a, a] can contains something, but not (a, a], [a, a) or (a, a)).
 +         * Note though that in the case where startToken or endToken is the minimum token, then this special case
 +         * rule should not apply.
 +         */
 +        int cmp = startToken.compareTo(endToken);
 +        if (!startToken.isMinimum() && !endToken.isMinimum()
 +                && (cmp > 0 || (cmp == 0 && (!includeStart || !includeEnd))))
 +            return null;
 +
 +        RowPosition start = includeStart ? startToken.minKeyBound() : startToken.maxKeyBound();
 +        RowPosition end = includeEnd ? endToken.maxKeyBound() : endToken.minKeyBound();
 +
 +        return new Range<>(start, end);
 +    }
 +
 +    private Token getTokenBound(Bound b, QueryOptions options, IPartitioner p) throws InvalidRequestException
 +    {
 +        if (!partitionKeyRestrictions.hasBound(b))
 +            return p.getMinimumToken();
 +
-         ByteBuffer value = partitionKeyRestrictions.bounds(b, options).get(0);
++        ByteBuffer value = partitionKeyRestrictions.bounds(cfm, b, options).get(0);
 +        checkNotNull(value, "Invalid null token value");
 +        return p.getTokenFactory().fromByteArray(value);
 +    }
 +
 +    /**
 +     * Checks if the query does not contains any restriction on the clustering columns.
 +     *
 +     * @return <code>true</code> if the query does not contains any restriction on the clustering columns,
 +     * <code>false</code> otherwise.
 +     */
 +    public boolean hasNoClusteringColumnsRestriction()
 +    {
 +        return clusteringColumnsRestrictions.isEmpty();
 +    }
 +
 +    /**
 +     * Checks if the query has some restrictions on the clustering columns.
 +     *
 +     * @return <code>true</code> if the query has some restrictions on the clustering columns,
 +     * <code>false</code> otherwise.
 +     */
 +    public boolean hasClusteringColumnsRestriction()
 +    {
 +        return !clusteringColumnsRestrictions.isEmpty();
 +    }
 +
 +    // For non-composite slices, we don't support internally the difference between exclusive and
 +    // inclusive bounds, so we deal with it manually.
 +    public boolean isNonCompositeSliceWithExclusiveBounds()
 +    {
 +        return !cfm.comparator.isCompound()
 +                && clusteringColumnsRestrictions.isSlice()
 +                && (!clusteringColumnsRestrictions.isInclusive(Bound.START) || !clusteringColumnsRestrictions.isInclusive(Bound.END));
 +    }
 +
 +    /**
 +     * Returns the requested clustering columns as <code>Composite</code>s.
 +     *
 +     * @param options the query options
 +     * @return the requested clustering columns as <code>Composite</code>s
 +     * @throws InvalidRequestException if the query is not valid
 +     */
 +    public List<Composite> getClusteringColumnsAsComposites(QueryOptions options) throws InvalidRequestException
 +    {
-         return clusteringColumnsRestrictions.valuesAsComposites(options);
++        return clusteringColumnsRestrictions.valuesAsComposites(cfm, options);
 +    }
 +
 +    /**
 +     * Returns the bounds (start or end) of the clustering columns as <code>Composites</code>.
 +     *
 +     * @param b the bound type
 +     * @param options the query options
 +     * @return the bounds (start or end) of the clustering columns as <code>Composites</code>
 +     * @throws InvalidRequestException if the request is not valid
 +     */
 +    public List<Composite> getClusteringColumnsBoundsAsComposites(Bound b,
 +                                                                  QueryOptions options) throws InvalidRequestException
 +    {
-         List<Composite> bounds = clusteringColumnsRestrictions.boundsAsComposites(b, options);
++        List<Composite> bounds = clusteringColumnsRestrictions.boundsAsComposites(cfm, b, options);
 +        for (Composite c : bounds) {
 +            if (!c.isEmpty())
 +                QueryProcessor.validateComposite(c, cfm.comparator);
 +        }
 +        return bounds;
 +    }
 +
 +    /**
 +     * Returns the bounds (start or end) of the clustering columns.
 +     *
 +     * @param b the bound type
 +     * @param options the query options
 +     * @return the bounds (start or end) of the clustering columns
 +     * @throws InvalidRequestException if the request is not valid
 +     */
 +    public List<ByteBuffer> getClusteringColumnsBounds(Bound b, QueryOptions options) throws InvalidRequestException
 +    {
-         return clusteringColumnsRestrictions.bounds(b, options);
++        return clusteringColumnsRestrictions.bounds(cfm, b, options);
 +    }
 +
 +    /**
 +     * Checks if the bounds (start or end) of the clustering columns are inclusive.
 +     *
 +     * @param bound the bound type
 +     * @return <code>true</code> if the bounds (start or end) of the clustering columns are inclusive,
 +     * <code>false</code> otherwise
 +     */
 +    public boolean areRequestedBoundsInclusive(Bound bound)
 +    {
 +        return clusteringColumnsRestrictions.isInclusive(bound);
 +    }
 +
 +    /**
 +     * Checks if the query returns a range of columns.
 +     *
 +     * @return <code>true</code> if the query returns a range of columns, <code>false</code> otherwise.
 +     */
 +    public boolean isColumnRange()
 +    {
 +        // Due to CASSANDRA-5762, we always do a slice for CQL3 tables (not dense, composite).
 +        // Static CF (non dense but non composite) never entails a column slice however
 +        if (!cfm.comparator.isDense())
 +            return cfm.comparator.isCompound();
 +
 +        // Otherwise (i.e. for compact table where we don't have a row marker anyway and thus don't care about
 +        // CASSANDRA-5762),
 +        // it is a range query if it has at least one the column alias for which no relation is defined or is not EQ.
 +        return clusteringColumnsRestrictions.size() < cfm.clusteringColumns().size() || clusteringColumnsRestrictions.isSlice();
 +    }
 +
 +    /**
 +     * Checks if the query need to use filtering.
 +     * @return <code>true</code> if the query need to use filtering, <code>false</code> otherwise.
 +     */
 +    public boolean needFiltering()
 +    {
 +        int numberOfRestrictedColumns = 0;
 +        for (Restrictions restrictions : indexRestrictions)
 +            numberOfRestrictedColumns += restrictions.size();
 +
 +        return numberOfRestrictedColumns > 1
 +                || (numberOfRestrictedColumns == 0 && !clusteringColumnsRestrictions.isEmpty())
 +                || (numberOfRestrictedColumns != 0
 +                        && nonPrimaryKeyRestrictions.hasMultipleContains());
 +    }
 +
 +    private void validateSecondaryIndexSelections(boolean selectsOnlyStaticColumns) throws InvalidRequestException
 +    {
 +        checkFalse(keyIsInRelation(),
 +                   "Select on indexed columns and with IN clause for the PRIMARY KEY are not supported");
 +        // When the user only select static columns, the intent is that we don't query the whole partition but just
 +        // the static parts. But 1) we don't have an easy way to do that with 2i and 2) since we don't support index on
 +        // static columns
 +        // so far, 2i means that you've restricted a non static column, so the query is somewhat non-sensical.
 +        checkFalse(selectsOnlyStaticColumns, "Queries using 2ndary indexes don't support selecting only static columns");
 +    }
 +
 +    public void reverse()
 +    {
 +        clusteringColumnsRestrictions = new ReversedPrimaryKeyRestrictions(clusteringColumnsRestrictions);
 +    }
++
++    /**
++     * Checks if the query will never return any rows.
++     *
++     * @param options the query options
++     * @return {@code true} if the query will never return any rows, {@false} otherwise
++     */
++    public boolean isNotReturningAnyRows(QueryOptions options)
++    {
++        return clusteringColumnsRestrictions.isNotReturningAnyRows(cfm, options);
++    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9583b6b3/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
index bd04610,0000000..18444ec
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
@@@ -1,237 -1,0 +1,245 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.restrictions;
 +
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.List;
 +
 +import com.google.common.collect.BoundType;
 +import com.google.common.collect.ImmutableRangeSet;
 +import com.google.common.collect.Range;
 +import com.google.common.collect.RangeSet;
 +
++import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.cql3.QueryOptions;
 +import org.apache.cassandra.cql3.statements.Bound;
 +import org.apache.cassandra.db.composites.Composite;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.service.StorageService;
 +
 +import static org.apache.cassandra.cql3.statements.Bound.END;
 +import static org.apache.cassandra.cql3.statements.Bound.START;
 +
 +/**
 + * <code>Restriction</code> decorator used to merge non-token restriction and token restriction on partition keys.
 + */
 +final class TokenFilter extends ForwardingPrimaryKeyRestrictions
 +{
 +    /**
 +     * The decorated restriction
 +     */
 +    private PrimaryKeyRestrictions restrictions;
 +
 +    /**
 +     * The restriction on the token
 +     */
 +    private TokenRestriction tokenRestriction;
 +
 +    /**
 +     * The partitioner
 +     */
 +    private static final IPartitioner partitioner = StorageService.getPartitioner();
 +
 +    @Override
 +    protected PrimaryKeyRestrictions getDelegate()
 +    {
 +        return restrictions;
 +    }
 +
 +    @Override
 +    public boolean isOnToken()
 +    {
 +        // if all partition key columns have non-token restrictions, we can simply use the token range to filter
 +        // those restrictions and then ignore the token range
 +        return restrictions.size() < tokenRestriction.size();
 +    }
 +
 +    public TokenFilter(PrimaryKeyRestrictions restrictions, TokenRestriction tokenRestriction)
 +    {
 +        this.restrictions = restrictions;
 +        this.tokenRestriction = tokenRestriction;
 +    }
 +
 +    @Override
-     public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
++    public List<ByteBuffer> values(CFMetaData cfm, QueryOptions options) throws InvalidRequestException
 +    {
-         return filter(restrictions.values(options), options);
++        return filter(cfm, restrictions.values(cfm, options), options);
 +    }
 +
 +    @Override
-     public List<Composite> valuesAsComposites(QueryOptions options) throws InvalidRequestException
++    public List<Composite> valuesAsComposites(CFMetaData cfm, QueryOptions options) throws InvalidRequestException
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public PrimaryKeyRestrictions mergeWith(Restriction restriction) throws InvalidRequestException
 +    {
 +        if (restriction.isOnToken())
 +            return new TokenFilter(restrictions, (TokenRestriction) tokenRestriction.mergeWith(restriction));
 +
 +        return new TokenFilter(super.mergeWith(restriction), tokenRestriction);
 +    }
 +
 +    @Override
 +    public boolean isInclusive(Bound bound)
 +    {
 +        return tokenRestriction.isInclusive(bound);
 +    }
 +
 +    @Override
 +    public boolean hasBound(Bound b)
 +    {
 +        return tokenRestriction.hasBound(b);
 +    }
 +
 +    @Override
-     public List<ByteBuffer> bounds(Bound bound, QueryOptions options) throws InvalidRequestException
++    public List<ByteBuffer> bounds(CFMetaData cfm, Bound bound, QueryOptions options) throws InvalidRequestException
 +    {
-         return tokenRestriction.bounds(bound, options);
++        return tokenRestriction.bounds(cfm, bound, options);
 +    }
 +
 +    @Override
-     public List<Composite> boundsAsComposites(Bound bound, QueryOptions options) throws InvalidRequestException
++    public List<Composite> boundsAsComposites(CFMetaData cfm, Bound bound, QueryOptions options) throws InvalidRequestException
 +    {
-         return tokenRestriction.boundsAsComposites(bound, options);
++        return tokenRestriction.boundsAsComposites(cfm, bound, options);
 +    }
 +
 +    /**
 +     * Filter the values returned by the restriction.
 +     *
++     * @param cfm the table metadata
 +     * @param values the values returned by the decorated restriction
 +     * @param options the query options
 +     * @return the values matching the token restriction
 +     * @throws InvalidRequestException if the request is invalid
 +     */
-     private List<ByteBuffer> filter(List<ByteBuffer> values, QueryOptions options) throws InvalidRequestException
++    private List<ByteBuffer> filter(CFMetaData cfm, List<ByteBuffer> values, QueryOptions options) throws InvalidRequestException
 +    {
-         RangeSet<Token> rangeSet = tokenRestriction.isSlice() ? toRangeSet(tokenRestriction, options)
-                                                               : toRangeSet(tokenRestriction.values(options));
++        RangeSet<Token> rangeSet = tokenRestriction.isSlice() ? toRangeSet(cfm, tokenRestriction, options)
++                                                              : toRangeSet(tokenRestriction.values(cfm, options));
 +
 +        return filterWithRangeSet(rangeSet, values);
 +    }
 +
 +    /**
 +     * Filter out the values for which the tokens are not included within the specified range.
 +     *
 +     * @param tokens the tokens range
 +     * @param values the restricted values
 +     * @return the values for which the tokens are not included within the specified range.
 +     */
 +    private static List<ByteBuffer> filterWithRangeSet(RangeSet<Token> tokens, List<ByteBuffer> values)
 +    {
 +        List<ByteBuffer> remaining = new ArrayList<>();
 +
 +        for (ByteBuffer value : values)
 +        {
 +            Token token = partitioner.getToken(value);
 +
 +            if (!tokens.contains(token))
 +                continue;
 +
 +            remaining.add(value);
 +        }
 +        return remaining;
 +    }
 +
 +    /**
 +     * Converts the specified list into a range set.
 +     *
 +     * @param buffers the token restriction values
 +     * @return the range set corresponding to the specified list
 +     */
 +    private static RangeSet<Token> toRangeSet(List<ByteBuffer> buffers)
 +    {
 +        ImmutableRangeSet.Builder<Token> builder = ImmutableRangeSet.builder();
 +
 +        for (ByteBuffer buffer : buffers)
 +            builder.add(Range.singleton(deserializeToken(buffer)));
 +
 +        return builder.build();
 +    }
 +
 +    /**
 +     * Converts the specified slice into a range set.
 +     *
++     * @param cfm the table metadata
 +     * @param slice the slice to convert
 +     * @param options the query option
 +     * @return the range set corresponding to the specified slice
 +     * @throws InvalidRequestException if the request is invalid
 +     */
-     private static RangeSet<Token> toRangeSet(TokenRestriction slice, QueryOptions options) throws InvalidRequestException
++    private static RangeSet<Token> toRangeSet(CFMetaData cfm, TokenRestriction slice, QueryOptions options) throws InvalidRequestException
 +    {
 +        if (slice.hasBound(START))
 +        {
-             Token start = deserializeToken(slice.bounds(START, options).get(0));
++            Token start = deserializeToken(slice.bounds(cfm, START, options).get(0));
 +
 +            BoundType startBoundType = toBoundType(slice.isInclusive(START));
 +
 +            if (slice.hasBound(END))
 +            {
 +                BoundType endBoundType = toBoundType(slice.isInclusive(END));
-                 Token end = deserializeToken(slice.bounds(END, options).get(0));
++                Token end = deserializeToken(slice.bounds(cfm, END, options).get(0));
 +
 +                if (start.equals(end) && (BoundType.OPEN == startBoundType || BoundType.OPEN == endBoundType))
 +                    return ImmutableRangeSet.of();
 +
 +                if (start.compareTo(end) <= 0)
 +                    return ImmutableRangeSet.of(Range.range(start,
 +                                                            startBoundType,
 +                                                            end,
 +                                                            endBoundType));
 +
 +                return ImmutableRangeSet.<Token> builder()
 +                                        .add(Range.upTo(end, endBoundType))
 +                                        .add(Range.downTo(start, startBoundType))
 +                                        .build();
 +            }
 +            return ImmutableRangeSet.of(Range.downTo(start,
 +                                                     startBoundType));
 +        }
-         Token end = deserializeToken(slice.bounds(END, options).get(0));
++        Token end = deserializeToken(slice.bounds(cfm, END, options).get(0));
 +        return ImmutableRangeSet.of(Range.upTo(end, toBoundType(slice.isInclusive(END))));
 +    }
 +
++    public boolean isNotReturningAnyRows(CFMetaData cfm, QueryOptions options)
++    {
++        return false;
++    }
++
 +    /**
 +     * Deserializes the token corresponding to the specified buffer.
 +     *
 +     * @param buffer the buffer
 +     * @return the token corresponding to the specified buffer
 +     */
 +    private static Token deserializeToken(ByteBuffer buffer)
 +    {
 +        return partitioner.getTokenFactory().fromByteArray(buffer);
 +    }
 +
 +    private static BoundType toBoundType(boolean inclusive)
 +    {
 +        return inclusive ? BoundType.CLOSED : BoundType.OPEN;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9583b6b3/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
index 3cd3304,0000000..97c55c4
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
@@@ -1,273 -1,0 +1,274 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.restrictions;
 +
 +import java.nio.ByteBuffer;
 +import java.util.Collections;
 +import java.util.List;
 +
 +import com.google.common.base.Joiner;
 +
++import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.QueryOptions;
 +import org.apache.cassandra.cql3.Term;
 +import org.apache.cassandra.cql3.functions.Function;
 +import org.apache.cassandra.cql3.statements.Bound;
 +import org.apache.cassandra.db.IndexExpression;
 +import org.apache.cassandra.db.composites.CType;
 +import org.apache.cassandra.db.composites.Composite;
 +import org.apache.cassandra.db.composites.CompositesBuilder;
 +import org.apache.cassandra.db.index.SecondaryIndexManager;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +
 +import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
 +
 +/**
 + * <code>Restriction</code> using the token function.
 + */
 +public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions
 +{
 +    /**
 +     * The definition of the columns to which apply the token restriction.
 +     */
 +    protected final List<ColumnDefinition> columnDefs;
 +
 +    /**
 +     * Creates a new <code>TokenRestriction</code> that apply to the specified columns.
 +     *
 +     * @param ctype the composite type
 +     * @param columnDefs the definition of the columns to which apply the token restriction
 +     */
 +    public TokenRestriction(CType ctype, List<ColumnDefinition> columnDefs)
 +    {
 +        super(ctype);
 +        this.columnDefs = columnDefs;
 +    }
 +
 +    @Override
 +    public  boolean isOnToken()
 +    {
 +        return true;
 +    }
 +
 +    @Override
 +    public List<ColumnDefinition> getColumnDefs()
 +    {
 +        return columnDefs;
 +    }
 +
 +    @Override
 +    public ColumnDefinition getFirstColumn()
 +    {
 +        return columnDefs.get(0);
 +    }
 +
 +    @Override
 +    public ColumnDefinition getLastColumn()
 +    {
 +        return columnDefs.get(columnDefs.size() - 1);
 +    }
 +
 +    @Override
 +    public boolean hasSupportingIndex(SecondaryIndexManager secondaryIndexManager)
 +    {
 +        return false;
 +    }
 +
 +    @Override
 +    public final void addIndexExpressionTo(List<IndexExpression> expressions,
 +                                     SecondaryIndexManager indexManager,
 +                                     QueryOptions options)
 +    {
 +        throw new UnsupportedOperationException("Index expression cannot be created for token restriction");
 +    }
 +
 +    @Override
-     public CompositesBuilder appendTo(CompositesBuilder builder, QueryOptions options)
++    public CompositesBuilder appendTo(CFMetaData cfm, CompositesBuilder builder, QueryOptions options)
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
-     public List<Composite> valuesAsComposites(QueryOptions options) throws InvalidRequestException
++    public List<Composite> valuesAsComposites(CFMetaData cfm, QueryOptions options) throws InvalidRequestException
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
-     public List<Composite> boundsAsComposites(Bound bound, QueryOptions options) throws InvalidRequestException
++    public List<Composite> boundsAsComposites(CFMetaData cfm, Bound bound, QueryOptions options) throws InvalidRequestException
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    /**
 +     * Returns the column names as a comma separated <code>String</code>.
 +     *
 +     * @return the column names as a comma separated <code>String</code>.
 +     */
 +    protected final String getColumnNamesAsString()
 +    {
 +        return Joiner.on(", ").join(ColumnDefinition.toIdentifiers(columnDefs));
 +    }
 +
 +    @Override
 +    public final PrimaryKeyRestrictions mergeWith(Restriction otherRestriction) throws InvalidRequestException
 +    {
 +        if (!otherRestriction.isOnToken())
 +            return new TokenFilter(toPrimaryKeyRestriction(otherRestriction), this);
 +
 +        return doMergeWith((TokenRestriction) otherRestriction);
 +    }
 +
 +    /**
 +     * Merges this restriction with the specified <code>TokenRestriction</code>.
 +     * @param otherRestriction the <code>TokenRestriction</code> to merge with.
 +     */
 +    protected abstract PrimaryKeyRestrictions doMergeWith(TokenRestriction otherRestriction) throws InvalidRequestException;
 +
 +    /**
 +     * Converts the specified restriction into a <code>PrimaryKeyRestrictions</code>.
 +     *
 +     * @param restriction the restriction to convert
 +     * @return a <code>PrimaryKeyRestrictions</code>
 +     * @throws InvalidRequestException if a problem occurs while converting the restriction
 +     */
 +    private PrimaryKeyRestrictions toPrimaryKeyRestriction(Restriction restriction) throws InvalidRequestException
 +    {
 +        if (restriction instanceof PrimaryKeyRestrictions)
 +            return (PrimaryKeyRestrictions) restriction;
 +
 +        return new PrimaryKeyRestrictionSet(ctype).mergeWith(restriction);
 +    }
 +
 +    public static final class EQ extends TokenRestriction
 +    {
 +        private final Term value;
 +
 +        public EQ(CType ctype, List<ColumnDefinition> columnDefs, Term value)
 +        {
 +            super(ctype, columnDefs);
 +            this.value = value;
 +        }
 +
 +        @Override
 +        public boolean isEQ()
 +        {
 +            return true;
 +        }
 +
 +        @Override
 +        public Iterable<Function> getFunctions()
 +        {
 +            return value.getFunctions();
 +        }
 +
 +        @Override
 +        protected PrimaryKeyRestrictions doMergeWith(TokenRestriction otherRestriction) throws InvalidRequestException
 +        {
 +            throw invalidRequest("%s cannot be restricted by more than one relation if it includes an Equal",
 +                                 Joiner.on(", ").join(ColumnDefinition.toIdentifiers(columnDefs)));
 +        }
 +
 +        @Override
-         public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
++        public List<ByteBuffer> values(CFMetaData cfm, QueryOptions options) throws InvalidRequestException
 +        {
 +            return Collections.singletonList(value.bindAndGet(options));
 +        }
 +    }
 +
 +    public static class Slice extends TokenRestriction
 +    {
 +        private final TermSlice slice;
 +
 +        public Slice(CType ctype, List<ColumnDefinition> columnDefs, Bound bound, boolean inclusive, Term term)
 +        {
 +            super(ctype, columnDefs);
 +            slice = TermSlice.newInstance(bound, inclusive, term);
 +        }
 +
 +        @Override
 +        public boolean isSlice()
 +        {
 +            return true;
 +        }
 +
 +        @Override
-         public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
++        public List<ByteBuffer> values(CFMetaData cfm, QueryOptions options) throws InvalidRequestException
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
 +        public boolean hasBound(Bound b)
 +        {
 +            return slice.hasBound(b);
 +        }
 +
 +        @Override
-         public List<ByteBuffer> bounds(Bound b, QueryOptions options) throws InvalidRequestException
++        public List<ByteBuffer> bounds(CFMetaData cfm, Bound b, QueryOptions options) throws InvalidRequestException
 +        {
 +            return Collections.singletonList(slice.bound(b).bindAndGet(options));
 +        }
 +
 +        @Override
 +        public Iterable<Function> getFunctions()
 +        {
 +            return slice.getFunctions();
 +        }
 +
 +        @Override
 +        public boolean isInclusive(Bound b)
 +        {
 +            return slice.isInclusive(b);
 +        }
 +
 +        @Override
 +        protected PrimaryKeyRestrictions doMergeWith(TokenRestriction otherRestriction)
 +        throws InvalidRequestException
 +        {
 +            if (!otherRestriction.isSlice())
 +                throw invalidRequest("Columns \"%s\" cannot be restricted by both an equality and an inequality relation",
 +                                     getColumnNamesAsString());
 +
 +            TokenRestriction.Slice otherSlice = (TokenRestriction.Slice) otherRestriction;
 +
 +            if (hasBound(Bound.START) && otherSlice.hasBound(Bound.START))
 +                throw invalidRequest("More than one restriction was found for the start bound on %s",
 +                                     getColumnNamesAsString());
 +
 +            if (hasBound(Bound.END) && otherSlice.hasBound(Bound.END))
 +                throw invalidRequest("More than one restriction was found for the end bound on %s",
 +                                     getColumnNamesAsString());
 +
 +            return new Slice(ctype, columnDefs,  slice.merge(otherSlice.slice));
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("SLICE%s", slice);
 +        }
 +
 +        private Slice(CType ctype, List<ColumnDefinition> columnDefs, TermSlice slice)
 +        {
 +            super(ctype, columnDefs);
 +            this.slice = slice;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9583b6b3/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 5ffcc8a,f84188a..3d134b5
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@@ -311,23 -301,38 +311,23 @@@ public abstract class ModificationState
      public List<ByteBuffer> buildPartitionKeyNames(QueryOptions options)
      throws InvalidRequestException
      {
 -        CBuilder keyBuilder = cfm.getKeyValidatorAsCType().builder();
 -        List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
 +        CompositesBuilder keyBuilder = new CompositesBuilder(cfm.getKeyValidatorAsCType());
          for (ColumnDefinition def : cfm.partitionKeyColumns())
          {
 -            Restriction r = processedKeys.get(def.name);
 -            if (r == null)
 -                throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", def.name));
 -
 -            List<ByteBuffer> values = r.values(options);
 +            Restriction r = checkNotNull(processedKeys.get(def.name), "Missing mandatory PRIMARY KEY part %s", def.name);
-             r.appendTo(keyBuilder, options);
++            r.appendTo(cfm, keyBuilder, options);
 +        }
  
 -            if (keyBuilder.remainingCount() == 1)
 -            {
 -                for (ByteBuffer val : values)
 -                {
 -                    if (val == null)
 -                        throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", def.name));
 -                    ByteBuffer key = keyBuilder.buildWith(val).toByteBuffer();
 -                    ThriftValidation.validateKey(cfm, key);
 -                    keys.add(key);
 -                }
 -            }
 -            else
 +        return Lists.transform(filterAndSort(keyBuilder.build()), new com.google.common.base.Function<Composite, ByteBuffer>()
 +        {
 +            @Override
 +            public ByteBuffer apply(Composite composite)
              {
 -                if (values.size() != 1)
 -                    throw new InvalidRequestException("IN is only supported on the last column of the partition key");
 -                ByteBuffer val = values.get(0);
 -                if (val == null)
 -                    throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", def.name));
 -                keyBuilder.add(val);
 +                ByteBuffer byteBuffer = composite.toByteBuffer();
 +                ThriftValidation.validateKey(cfm, byteBuffer);
 +                return byteBuffer;
              }
 -        }
 -        return keys;
 +        });
      }
  
      public Composite createClusteringPrefix(QueryOptions options)
@@@ -406,26 -411,15 +406,26 @@@
              }
              else
              {
-                 r.appendTo(builder, options);
 -                List<ByteBuffer> values = r.values(options);
 -                assert values.size() == 1; // We only allow IN for row keys so far
 -                ByteBuffer val = values.get(0);
 -                if (val == null)
 -                    throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s", def.name));
 -                builder.add(val);
++                r.appendTo(cfm, builder, options);
              }
          }
 -        return builder.build();
 +        return builder.build().get(0); // We only allow IN for row keys so far
 +    }
 +
 +    /**
 +     * Removes duplicates and sort the specified composites.
 +     *
 +     * @param composites the composites to filter and sort
 +     * @return the composites sorted and without duplicates
 +     */
 +    private List<Composite> filterAndSort(List<Composite> composites)
 +    {
 +        if (composites.size() <= 1)
 +            return composites;
 +
 +        TreeSet<Composite> set = new TreeSet<Composite>(cfm.getKeyValidatorAsCType());
 +        set.addAll(composites);
 +        return new ArrayList<>(set);
      }
  
      protected ColumnDefinition getFirstEmptyKey()