You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2015/02/10 22:11:08 UTC

[4/4] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fbc38cd3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fbc38cd3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fbc38cd3

Branch: refs/heads/trunk
Commit: fbc38cd3a2dbda77aeca4a84765550fc571031ad
Parents: 187624b 07ffe1b
Author: Tyler Hobbs <ty...@apache.org>
Authored: Tue Feb 10 15:10:49 2015 -0600
Committer: Tyler Hobbs <ty...@apache.org>
Committed: Tue Feb 10 15:10:49 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../ForwardingPrimaryKeyRestrictions.java       |   3 +-
 .../restrictions/MultiColumnRestriction.java    |  63 ++++++------
 .../cql3/restrictions/Restriction.java          |   2 +
 .../cql3/restrictions/Restrictions.java         |   2 +
 .../SingleColumnPrimaryKeyRestrictions.java     |  26 ++++-
 .../restrictions/SingleColumnRestriction.java   |   6 ++
 .../restrictions/SingleColumnRestrictions.java  |   3 +-
 .../restrictions/StatementRestrictions.java     |  33 +-----
 .../cql3/restrictions/TokenRestriction.java     |   4 +-
 .../cql3/statements/SelectStatement.java        |   5 +-
 .../cassandra/cql3/MultiColumnRelationTest.java | 100 ++++++++++++++-----
 .../cql3/SingleColumnRelationTest.java          |  67 +++++++++++++
 13 files changed, 226 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbc38cd3/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbc38cd3/src/java/org/apache/cassandra/cql3/restrictions/ForwardingPrimaryKeyRestrictions.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/restrictions/ForwardingPrimaryKeyRestrictions.java
index 8a57292,0000000..5492c2b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/ForwardingPrimaryKeyRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/ForwardingPrimaryKeyRestrictions.java
@@@ -1,159 -1,0 +1,160 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.restrictions;
 +
 +import java.nio.ByteBuffer;
 +import java.util.Collection;
 +import java.util.List;
 +
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.QueryOptions;
 +import org.apache.cassandra.cql3.statements.Bound;
 +import org.apache.cassandra.db.IndexExpression;
 +import org.apache.cassandra.db.composites.Composite;
 +import org.apache.cassandra.db.index.SecondaryIndexManager;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +
 +/**
 + * A <code>PrimaryKeyRestrictions</code> which forwards all its method calls to another 
 + * <code>PrimaryKeyRestrictions</code>. Subclasses should override one or more methods to modify the behavior 
 + * of the backing <code>PrimaryKeyRestrictions</code> as desired per the decorator pattern. 
 + */
 +abstract class ForwardingPrimaryKeyRestrictions implements PrimaryKeyRestrictions
 +{
 +    /**
 +     * Returns the backing delegate instance that methods are forwarded to.
 +     * @return the backing delegate instance that methods are forwarded to.
 +     */
 +    protected abstract PrimaryKeyRestrictions getDelegate();
 +
 +    @Override
 +    public boolean usesFunction(String ksName, String functionName)
 +    {
 +        return getDelegate().usesFunction(ksName, functionName);
 +    }
 +
 +    @Override
 +    public Collection<ColumnDefinition> getColumnDefs()
 +    {
 +        return getDelegate().getColumnDefs();
 +    }
 +
 +    @Override
 +    public PrimaryKeyRestrictions mergeWith(Restriction restriction) throws InvalidRequestException
 +    {
 +        return getDelegate().mergeWith(restriction);
 +    }
 +
 +    @Override
 +    public boolean hasSupportingIndex(SecondaryIndexManager secondaryIndexManager)
 +    {
 +        return getDelegate().hasSupportingIndex(secondaryIndexManager);
 +    }
 +
 +    @Override
 +    public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
 +    {
 +        return getDelegate().values(options);
 +    }
 +
 +    @Override
 +    public List<Composite> valuesAsComposites(QueryOptions options) throws InvalidRequestException
 +    {
 +        return getDelegate().valuesAsComposites(options);
 +    }
 +
 +    @Override
 +    public List<ByteBuffer> bounds(Bound bound, QueryOptions options) throws InvalidRequestException
 +    {
 +        return getDelegate().bounds(bound, options);
 +    }
 +
 +    @Override
 +    public List<Composite> boundsAsComposites(Bound bound, QueryOptions options) throws InvalidRequestException
 +    {
 +        return getDelegate().boundsAsComposites(bound, options);
 +    }
 +
 +    @Override
 +    public boolean isInclusive(Bound bound)
 +    {
 +        return getDelegate().isInclusive(bound.reverse());
 +    }
 +
 +    @Override
 +    public boolean isEmpty()
 +    {
 +        return getDelegate().isEmpty();
 +    }
 +
 +    @Override
 +    public int size()
 +    {
 +        return getDelegate().size();
 +    }
 +
 +    @Override
 +    public boolean isOnToken()
 +    {
 +        return getDelegate().isOnToken();
 +    }
 +
 +    @Override
 +    public boolean isSlice()
 +    {
 +        return getDelegate().isSlice();
 +    }
 +
 +    @Override
 +    public boolean isEQ()
 +    {
 +        return getDelegate().isEQ();
 +    }
 +
 +    @Override
 +    public boolean isIN()
 +    {
 +        return getDelegate().isIN();
 +    }
 +
 +    @Override
 +    public boolean isContains()
 +    {
 +        return getDelegate().isContains();
 +    }
 +
 +    @Override
 +    public boolean isMultiColumn()
 +    {
 +        return getDelegate().isMultiColumn();
 +    }
 +
 +    @Override
 +    public boolean hasBound(Bound b)
 +    {
 +        return getDelegate().hasBound(b);
 +    }
 +
 +    @Override
 +    public void addIndexExpressionTo(List<IndexExpression> expressions,
++                                     SecondaryIndexManager indexManager,
 +                                     QueryOptions options) throws InvalidRequestException
 +    {
-         getDelegate().addIndexExpressionTo(expressions, options);
++        getDelegate().addIndexExpressionTo(expressions, indexManager, options);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbc38cd3/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
index 2d6deeb,0000000..9f6ab4c
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
@@@ -1,518 -1,0 +1,519 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.restrictions;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.AbstractMarker;
 +import org.apache.cassandra.cql3.Operator;
 +import org.apache.cassandra.cql3.QueryOptions;
 +import org.apache.cassandra.cql3.Term;
 +import org.apache.cassandra.cql3.Tuples;
 +import org.apache.cassandra.cql3.statements.Bound;
 +import org.apache.cassandra.db.IndexExpression;
 +import org.apache.cassandra.db.composites.CBuilder;
 +import org.apache.cassandra.db.composites.CType;
 +import org.apache.cassandra.db.composites.Composite;
 +import org.apache.cassandra.db.composites.Composites;
 +import org.apache.cassandra.db.index.SecondaryIndex;
 +import org.apache.cassandra.db.index.SecondaryIndexManager;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
 +
 +public abstract class MultiColumnRestriction extends AbstractPrimaryKeyRestrictions
 +{
 +    /**
 +     * The columns to which the restriction apply.
 +     */
 +    protected final List<ColumnDefinition> columnDefs;
 +
 +    public MultiColumnRestriction(CType ctype, List<ColumnDefinition> columnDefs)
 +    {
 +        super(ctype);
 +        this.columnDefs = columnDefs;
 +    }
 +
 +    @Override
 +    public boolean isMultiColumn()
 +    {
 +        return true;
 +    }
 +
 +    @Override
 +    public Collection<ColumnDefinition> getColumnDefs()
 +    {
 +        return columnDefs;
 +    }
 +
 +    @Override
 +    public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
 +    {
 +        return Composites.toByteBuffers(valuesAsComposites(options));
 +    }
 +
 +    @Override
 +    public final PrimaryKeyRestrictions mergeWith(Restriction otherRestriction) throws InvalidRequestException
 +    {
 +            checkTrue(otherRestriction.isMultiColumn(),
 +                      "Mixing single column relations and multi column relations on clustering columns is not allowed");
 +            return doMergeWith((PrimaryKeyRestrictions) otherRestriction);
 +    }
 +
 +    protected abstract PrimaryKeyRestrictions doMergeWith(PrimaryKeyRestrictions otherRestriction) throws InvalidRequestException;
 +
 +    /**
 +     * Returns the names of the columns that are specified within this <code>Restrictions</code> and the other one
 +     * as a comma separated <code>String</code>.
 +     *
 +     * @param otherRestrictions the other restrictions
 +     * @return the names of the columns that are specified within this <code>Restrictions</code> and the other one
 +     * as a comma separated <code>String</code>.
 +     */
 +    protected final String getColumnsInCommons(Restrictions otherRestrictions)
 +    {
 +        Set<ColumnDefinition> commons = new HashSet<>(getColumnDefs());
 +        commons.retainAll(otherRestrictions.getColumnDefs());
 +        StringBuilder builder = new StringBuilder();
 +        for (ColumnDefinition columnDefinition : commons)
 +        {
 +            if (builder.length() != 0)
 +                builder.append(" ,");
 +            builder.append(columnDefinition.name);
 +        }
 +        return builder.toString();
 +    }
 +
 +    @Override
 +    public final boolean hasSupportingIndex(SecondaryIndexManager indexManager)
 +    {
 +        for (ColumnDefinition columnDef : columnDefs)
 +        {
 +            SecondaryIndex index = indexManager.getIndexForColumn(columnDef.name.bytes);
 +            if (index != null && isSupportedBy(index))
 +                return true;
 +        }
 +        return false;
 +    }
 +
++    @Override
++    public final void addIndexExpressionTo(List<IndexExpression> expressions,
++                                           SecondaryIndexManager indexManager,
++                                           QueryOptions options) throws InvalidRequestException
++    {
++        for (ColumnDefinition columnDef : columnDefs)
++        {
++            SecondaryIndex index = indexManager.getIndexForColumn(columnDef.name.bytes);
++            if (index != null && isSupportedBy(index))
++                expressions.add(getIndexExpression(columnDef, options));
++        }
++    }
++
++    /**
++     * Returns the <code>IndexExpression</code> for the specified column.
++     *
++     * @param columnDef the column definition
++     * @param options the query options
++     * @return the <code>IndexExpression</code> for the specified column
++     */
++    protected IndexExpression getIndexExpression(ColumnDefinition columnDef,
++                                                 QueryOptions options) throws InvalidRequestException
++    {
++        // Except for EQ this operation is not supported
++        throw new UnsupportedOperationException();
++    }
++
 +    /**
-      * Check if this type of restriction is supported for the specified column by the specified index.
++     * Check if this type of restriction is supported for by the specified index.
 +     * @param index the Secondary index
 +     *
 +     * @return <code>true</code> this type of restriction is supported by the specified index,
 +     * <code>false</code> otherwise.
 +     */
 +    protected abstract boolean isSupportedBy(SecondaryIndex index);
 +
 +    public static class EQ  extends MultiColumnRestriction
 +    {
 +        protected final Term value;
 +
 +        public EQ(CType ctype, List<ColumnDefinition> columnDefs, Term value)
 +        {
 +            super(ctype, columnDefs);
 +            this.value = value;
 +        }
 +
 +        @Override
 +        public boolean usesFunction(String ksName, String functionName)
 +        {
 +            return usesFunction(value, ksName, functionName);
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("EQ(%s)", value);
 +        }
 +
 +        @Override
 +        public PrimaryKeyRestrictions doMergeWith(PrimaryKeyRestrictions otherRestriction) throws InvalidRequestException
 +        {
 +            throw invalidRequest("%s cannot be restricted by more than one relation if it includes an Equal",
 +                                 getColumnsInCommons(otherRestriction));
 +        }
 +
 +        @Override
 +        public List<Composite> valuesAsComposites(QueryOptions options) throws InvalidRequestException
 +        {
 +            return Collections.singletonList(compositeValue(options));
 +        }
 +
 +        @Override
 +        public List<Composite> boundsAsComposites(Bound bound, QueryOptions options) throws InvalidRequestException
 +        {
 +            Composite prefix = compositeValue(options);
 +            return Collections.singletonList(ctype.size() > prefix.size() && bound.isEnd()
 +                                             ? prefix.end()
 +                                             : prefix);
 +        }
 +
 +        @Override
 +        protected boolean isSupportedBy(SecondaryIndex index)
 +        {
 +            return index.supportsOperator(Operator.EQ);
 +        }
 +
 +        private Composite compositeValue(QueryOptions options) throws InvalidRequestException
 +        {
 +            CBuilder builder = ctype.builder();
 +            Tuples.Value t = ((Tuples.Value) value.bind(options));
 +            List<ByteBuffer> values = t.getElements();
 +            for (int i = 0; i < values.size(); i++)
 +            {
 +                ByteBuffer component = checkNotNull(values.get(i),
 +                                                    "Invalid null value in condition for column %s",
 +                                                    columnDefs.get(i).name);
 +                builder.add(component);
 +            }
 +
 +            return builder.build();
 +        }
 +
 +        @Override
-         public final void addIndexExpressionTo(List<IndexExpression> expressions,
-                                                QueryOptions options) throws InvalidRequestException
++        protected final IndexExpression getIndexExpression(ColumnDefinition columnDef,
++                                                           QueryOptions options) throws InvalidRequestException
 +        {
 +            Tuples.Value t = ((Tuples.Value) value.bind(options));
 +            List<ByteBuffer> values = t.getElements();
-             for (int i = 0; i < values.size(); i++)
-             {
-                 ColumnDefinition columnDef = columnDefs.get(i);
-                 ByteBuffer component = validateIndexedValue(columnDef, values.get(i));
-                 expressions.add(new IndexExpression(columnDef.name.bytes, Operator.EQ, component));
-             }
++            ByteBuffer component = validateIndexedValue(columnDef, values.get(columnDef.position()));
++            return new IndexExpression(columnDef.name.bytes, Operator.EQ, component);
 +        }
 +    }
 +
 +    public abstract static class IN extends MultiColumnRestriction
 +    {
 +        @Override
 +        public List<Composite> valuesAsComposites(QueryOptions options) throws InvalidRequestException
 +        {
 +            CBuilder builder = ctype.builder();
 +            List<List<ByteBuffer>> splitInValues = splitValues(options);
 +            // The IN query might not have listed the values in comparator order, so we need to re-sort
 +            // the bounds lists to make sure the slices works correctly (also, to avoid duplicates).
 +            TreeSet<Composite> inValues = new TreeSet<>(ctype);
 +            for (List<ByteBuffer> components : splitInValues)
 +            {
 +                for (int i = 0; i < components.size(); i++)
 +                    checkNotNull(components.get(i), "Invalid null value in condition for column " + columnDefs.get(i).name);
 +
 +                inValues.add(builder.buildWith(components));
 +            }
 +            return new ArrayList<>(inValues);
 +        }
 +
 +        @Override
 +        public List<Composite> boundsAsComposites(Bound bound, QueryOptions options) throws InvalidRequestException
 +        {
 +            CBuilder builder = ctype.builder();
 +            List<List<ByteBuffer>> splitInValues = splitValues(options);
 +            // The IN query might not have listed the values in comparator order, so we need to re-sort
 +            // the bounds lists to make sure the slices works correctly (also, to avoid duplicates).
 +            TreeSet<Composite> inValues = new TreeSet<>(ctype);
 +            for (List<ByteBuffer> components : splitInValues)
 +            {
 +                for (int i = 0; i < components.size(); i++)
 +                    checkNotNull(components.get(i), "Invalid null value in condition for column %s", columnDefs.get(i).name);
 +
 +                Composite prefix = builder.buildWith(components);
 +                inValues.add(bound.isEnd() && builder.remainingCount() - components.size() > 0
 +                             ? prefix.end()
 +                             : prefix);
 +            }
 +            return new ArrayList<>(inValues);
 +        }
 +
-         @Override
-         public void addIndexExpressionTo(List<IndexExpression> expressions,
-                                          QueryOptions options) throws InvalidRequestException
-         {
-             List<List<ByteBuffer>> splitInValues = splitValues(options);
-             checkTrue(splitInValues.size() == 1, "IN restrictions are not supported on indexed columns");
- 
-             List<ByteBuffer> values = splitInValues.get(0);
-             checkTrue(values.size() == 1, "IN restrictions are not supported on indexed columns");
- 
-             ColumnDefinition columnDef = columnDefs.get(0);
-             ByteBuffer component = validateIndexedValue(columnDef, values.get(0));
-             expressions.add(new IndexExpression(columnDef.name.bytes, Operator.EQ, component));
-         }
- 
 +        public IN(CType ctype, List<ColumnDefinition> columnDefs)
 +        {
 +            super(ctype, columnDefs);
 +        }
 +
 +        @Override
 +        public boolean isIN()
 +        {
 +            return true;
 +        }
 +
 +        @Override
 +        public PrimaryKeyRestrictions doMergeWith(PrimaryKeyRestrictions otherRestrictions) throws InvalidRequestException
 +        {
 +            throw invalidRequest("%s cannot be restricted by more than one relation if it includes a IN",
 +                                 getColumnsInCommons(otherRestrictions));
 +        }
 +
 +        @Override
 +        protected boolean isSupportedBy(SecondaryIndex index)
 +        {
 +            return index.supportsOperator(Operator.IN);
 +        }
 +
 +        protected abstract List<List<ByteBuffer>> splitValues(QueryOptions options) throws InvalidRequestException;
 +    }
 +
 +    /**
 +     * An IN restriction that has a set of terms for in values.
 +     * For example: "SELECT ... WHERE (a, b, c) IN ((1, 2, 3), (4, 5, 6))" or "WHERE (a, b, c) IN (?, ?)"
 +     */
 +    public static class InWithValues extends MultiColumnRestriction.IN
 +    {
 +        protected final List<Term> values;
 +
 +        public InWithValues(CType ctype, List<ColumnDefinition> columnDefs, List<Term> values)
 +        {
 +            super(ctype, columnDefs);
 +            this.values = values;
 +        }
 +
 +        @Override
 +        public boolean usesFunction(String ksName, String functionName)
 +        {
 +            return usesFunction(values, ksName, functionName);
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("IN(%s)", values);
 +        }
 +
 +        @Override
 +        protected List<List<ByteBuffer>> splitValues(QueryOptions options) throws InvalidRequestException
 +        {
 +            List<List<ByteBuffer>> buffers = new ArrayList<>(values.size());
 +            for (Term value : values)
 +            {
 +                Term.MultiItemTerminal term = (Term.MultiItemTerminal) value.bind(options);
 +                buffers.add(term.getElements());
 +            }
 +            return buffers;
 +        }
 +    }
 +
 +    /**
 +     * An IN restriction that uses a single marker for a set of IN values that are tuples.
 +     * For example: "SELECT ... WHERE (a, b, c) IN ?"
 +     */
 +    public static class InWithMarker extends MultiColumnRestriction.IN
 +    {
 +        protected final AbstractMarker marker;
 +
 +        public InWithMarker(CType ctype, List<ColumnDefinition> columnDefs, AbstractMarker marker)
 +        {
 +            super(ctype, columnDefs);
 +            this.marker = marker;
 +        }
 +
 +        @Override
 +        public boolean usesFunction(String ksName, String functionName)
 +        {
 +            return false;
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return "IN ?";
 +        }
 +
 +        @Override
 +        protected List<List<ByteBuffer>> splitValues(QueryOptions options) throws InvalidRequestException
 +        {
 +            Tuples.InMarker inMarker = (Tuples.InMarker) marker;
 +            Tuples.InValue inValue = inMarker.bind(options);
 +            checkNotNull(inValue, "Invalid null value for IN restriction");
 +            return inValue.getSplitValues();
 +        }
 +    }
 +
 +    public static class Slice extends MultiColumnRestriction
 +    {
 +        private final TermSlice slice;
 +
 +        public Slice(CType ctype, List<ColumnDefinition> columnDefs, Bound bound, boolean inclusive, Term term)
 +        {
 +            this(ctype, columnDefs, TermSlice.newInstance(bound, inclusive, term));
 +        }
 +
 +        private Slice(CType ctype, List<ColumnDefinition> columnDefs, TermSlice slice)
 +        {
 +            super(ctype, columnDefs);
 +            this.slice = slice;
 +        }
 +
 +        @Override
 +        public boolean isSlice()
 +        {
 +            return true;
 +        }
 +
 +        @Override
 +        public List<Composite> valuesAsComposites(QueryOptions options) throws InvalidRequestException
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
 +        public List<ByteBuffer> bounds(Bound b, QueryOptions options) throws InvalidRequestException
 +        {
 +            return Composites.toByteBuffers(boundsAsComposites(b, options));
 +        }
 +
 +        @Override
 +        public List<Composite> boundsAsComposites(Bound bound, QueryOptions options) throws InvalidRequestException
 +        {
 +            CBuilder builder = ctype.builder();
 +            Iterator<ColumnDefinition> iter = columnDefs.iterator();
 +            ColumnDefinition firstName = iter.next();
 +            // A hack to preserve pre-6875 behavior for tuple-notation slices where the comparator mixes ASCENDING
 +            // and DESCENDING orders.  This stores the bound for the first component; we will re-use it for all following
 +            // components, even if they don't match the first component's reversal/non-reversal.  Note that this does *not*
 +            // guarantee correct query results, it just preserves the previous behavior.
 +            Bound firstComponentBound = !firstName.isReversedType() ? bound : bound.reverse();
 +
 +            if (!hasBound(firstComponentBound))
 +            {
 +                Composite prefix = builder.build();
 +                return Collections.singletonList(builder.remainingCount() > 0 && bound.isEnd()
 +                        ? prefix.end()
 +                        : prefix);
 +            }
 +
 +            List<ByteBuffer> vals = componentBounds(firstComponentBound, options);
 +
 +            ByteBuffer v = checkNotNull(vals.get(firstName.position()), "Invalid null value in condition for column %s", firstName.name);
 +            builder.add(v);
 +
 +            while (iter.hasNext())
 +            {
 +                ColumnDefinition def = iter.next();
 +                if (def.position() >= vals.size())
 +                    break;
 +
 +                v = checkNotNull(vals.get(def.position()), "Invalid null value in condition for column %s", def.name);
 +                builder.add(v);
 +            }
 +            Composite.EOC eoc =  eocFor(this, bound, firstComponentBound);
 +            return Collections.singletonList(builder.build().withEOC(eoc));
 +        }
 +
 +        @Override
-         public void addIndexExpressionTo(List<IndexExpression> expressions,
-                                          QueryOptions options) throws InvalidRequestException
-         {
-             throw invalidRequest("Slice restrictions are not supported on indexed columns which are part of a multi column relation");
-         }
- 
-         @Override
 +        protected boolean isSupportedBy(SecondaryIndex index)
 +        {
 +            return slice.isSupportedBy(index);
 +        }
 +
 +        private static Composite.EOC eocFor(Restriction r, Bound eocBound, Bound inclusiveBound)
 +        {
 +            if (eocBound.isStart())
 +                return r.isInclusive(inclusiveBound) ? Composite.EOC.NONE : Composite.EOC.END;
 +
 +            return r.isInclusive(inclusiveBound) ? Composite.EOC.END : Composite.EOC.START;
 +        }
 +
 +        @Override
 +        public boolean hasBound(Bound b)
 +        {
 +            return slice.hasBound(b);
 +        }
 +
 +        @Override
 +        public boolean usesFunction(String ksName, String functionName)
 +        {
 +            return (slice.hasBound(Bound.START) && usesFunction(slice.bound(Bound.START), ksName, functionName))
 +                    || (slice.hasBound(Bound.END) && usesFunction(slice.bound(Bound.END), ksName, functionName));
 +        }
 +
 +        @Override
 +        public boolean isInclusive(Bound b)
 +        {
 +            return slice.isInclusive(b);
 +        }
 +
 +        @Override
 +        public PrimaryKeyRestrictions doMergeWith(PrimaryKeyRestrictions otherRestriction) throws InvalidRequestException
 +        {
 +            checkTrue(otherRestriction.isSlice(),
 +                      "Column \"%s\" cannot be restricted by both an equality and an inequality relation",
 +                      getColumnsInCommons(otherRestriction));
 +
 +            Slice otherSlice = (Slice) otherRestriction;
 +
 +            checkFalse(hasBound(Bound.START) && otherSlice.hasBound(Bound.START),
 +                       "More than one restriction was found for the start bound on %s",
 +                       getColumnsInCommons(otherRestriction));
 +            checkFalse(hasBound(Bound.END) && otherSlice.hasBound(Bound.END),
 +                       "More than one restriction was found for the end bound on %s",
 +                       getColumnsInCommons(otherRestriction));
 +
 +            List<ColumnDefinition> newColumnDefs = size() >= otherSlice.size() ?  columnDefs : otherSlice.columnDefs;
 +            return new Slice(ctype,  newColumnDefs, slice.merge(otherSlice.slice));
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return "SLICE" + slice;
 +        }
 +
 +        /**
 +         * Similar to bounds(), but returns one ByteBuffer per-component in the bound instead of a single
 +         * ByteBuffer to represent the entire bound.
 +         * @param b the bound type
 +         * @param options the query options
 +         * @return one ByteBuffer per-component in the bound
 +         * @throws InvalidRequestException if the components cannot be retrieved
 +         */
 +        private List<ByteBuffer> componentBounds(Bound b, QueryOptions options) throws InvalidRequestException
 +        {
 +            Tuples.Value value = (Tuples.Value) slice.bound(b).bind(options);
 +            return value.getElements();
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbc38cd3/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/restrictions/Restriction.java
index d0ed193,0000000..f6d0c73
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java
@@@ -1,97 -1,0 +1,99 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.restrictions;
 +
 +import java.nio.ByteBuffer;
 +import java.util.List;
 +
 +import org.apache.cassandra.cql3.QueryOptions;
 +import org.apache.cassandra.cql3.statements.Bound;
 +import org.apache.cassandra.db.IndexExpression;
 +import org.apache.cassandra.db.index.SecondaryIndexManager;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +
 +/**
 + * A restriction/clause on a column.
 + * The goal of this class being to group all conditions for a column in a SELECT.
 + */
 +public interface Restriction
 +{
 +    public boolean isOnToken();
 +    public boolean isSlice();
 +    public boolean isEQ();
 +    public boolean isIN();
 +    public boolean isContains();
 +    public boolean isMultiColumn();
 +
 +    public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException;
 +
 +    /**
 +     * Returns <code>true</code> if one of the restrictions use the specified function.
 +     *
 +     * @param ksName the keyspace name
 +     * @param functionName the function name
 +     * @return <code>true</code> if one of the restrictions use the specified function, <code>false</code> otherwise.
 +     */
 +    public boolean usesFunction(String ksName, String functionName);
 +
 +    /**
 +     * Checks if the specified bound is set or not.
 +     * @param b the bound type
 +     * @return <code>true</code> if the specified bound is set, <code>false</code> otherwise
 +     */
 +    public boolean hasBound(Bound b);
 +
 +    public List<ByteBuffer> bounds(Bound b, QueryOptions options) throws InvalidRequestException;
 +
 +    /**
 +     * Checks if the specified bound is inclusive or not.
 +     * @param b the bound type
 +     * @return <code>true</code> if the specified bound is inclusive, <code>false</code> otherwise
 +     */
 +    public boolean isInclusive(Bound b);
 +
 +    /**
 +     * Merges this restriction with the specified one.
 +     *
 +     * @param otherRestriction the restriction to merge into this one
 +     * @return the restriction resulting of the merge
 +     * @throws InvalidRequestException if the restrictions cannot be merged
 +     */
 +    public Restriction mergeWith(Restriction otherRestriction) throws InvalidRequestException;
 +
 +    /**
 +     * Check if the restriction is on indexed columns.
 +     *
 +     * @param indexManager the index manager
 +     * @return <code>true</code> if the restriction is on indexed columns, <code>false</code>
 +     */
 +    public boolean hasSupportingIndex(SecondaryIndexManager indexManager);
 +
 +    /**
 +     * Adds to the specified list the <code>IndexExpression</code>s corresponding to this <code>Restriction</code>.
 +     *
 +     * @param expressions the list to add the <code>IndexExpression</code>s to
++     * @param indexManager the secondary index manager
 +     * @param options the query options
 +     * @throws InvalidRequestException if this <code>Restriction</code> cannot be converted into 
 +     * <code>IndexExpression</code>s
 +     */
 +    public void addIndexExpressionTo(List<IndexExpression> expressions,
++                                     SecondaryIndexManager indexManager,
 +                                     QueryOptions options)
 +                                     throws InvalidRequestException;
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbc38cd3/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
index cf2555e,0000000..3cfe4ab
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
@@@ -1,82 -1,0 +1,84 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.restrictions;
 +
 +import java.util.Collection;
 +import java.util.List;
 +
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.QueryOptions;
 +import org.apache.cassandra.db.IndexExpression;
 +import org.apache.cassandra.db.index.SecondaryIndexManager;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +
 +/**
 + * Sets of restrictions
 + */
 +interface Restrictions
 +{
 +    /**
 +     * Returns the column definitions in position order.
 +     * @return the column definitions in position order.
 +     */
 +    public Collection<ColumnDefinition> getColumnDefs();
 +
 +    /**
 +     * Returns <code>true</code> if one of the restrictions use the specified function.
 +     *
 +     * @param ksName the keyspace name
 +     * @param functionName the function name
 +     * @return <code>true</code> if one of the restrictions use the specified function, <code>false</code> otherwise.
 +     */
 +    public boolean usesFunction(String ksName, String functionName);
 +
 +    /**
 +     * Check if the restriction is on indexed columns.
 +     *
 +     * @param indexManager the index manager
 +     * @return <code>true</code> if the restriction is on indexed columns, <code>false</code>
 +     */
 +    public boolean hasSupportingIndex(SecondaryIndexManager indexManager);
 +
 +    /**
 +     * Adds to the specified list the <code>IndexExpression</code>s corresponding to this <code>Restriction</code>.
 +     *
 +     * @param expressions the list to add the <code>IndexExpression</code>s to
++     * @param indexManager the secondary index manager
 +     * @param options the query options
 +     * @throws InvalidRequestException if this <code>Restriction</code> cannot be converted into
 +     * <code>IndexExpression</code>s
 +     */
 +    public void addIndexExpressionTo(List<IndexExpression> expressions,
++                                     SecondaryIndexManager indexManager,
 +                                     QueryOptions options)
 +                                     throws InvalidRequestException;
 +
 +    /**
 +     * Checks if this <code>SingleColumnPrimaryKeyRestrictions</code> is empty or not.
 +     *
 +     * @return <code>true</code> if this <code>SingleColumnPrimaryKeyRestrictions</code> is empty, <code>false</code> otherwise.
 +     */
 +    boolean isEmpty();
 +
 +    /**
 +     * Returns the number of columns that have a restriction.
 +     *
 +     * @return the number of columns that have a restriction.
 +     */
 +    public int size();
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbc38cd3/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnPrimaryKeyRestrictions.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/restrictions/SingleColumnPrimaryKeyRestrictions.java
index e109036,0000000..945479a
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnPrimaryKeyRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnPrimaryKeyRestrictions.java
@@@ -1,305 -1,0 +1,327 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.restrictions;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.QueryOptions;
 +import org.apache.cassandra.cql3.statements.Bound;
 +import org.apache.cassandra.db.IndexExpression;
 +import org.apache.cassandra.db.composites.CBuilder;
 +import org.apache.cassandra.db.composites.CType;
 +import org.apache.cassandra.db.composites.Composite;
 +import org.apache.cassandra.db.composites.Composite.EOC;
 +import org.apache.cassandra.db.composites.Composites;
 +import org.apache.cassandra.db.composites.CompositesBuilder;
 +import org.apache.cassandra.db.index.SecondaryIndexManager;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
 +
 +/**
 + * A set of single column restrictions on a primary key part (partition key or clustering key).
 + */
 +final class SingleColumnPrimaryKeyRestrictions extends AbstractPrimaryKeyRestrictions
 +{
 +    /**
 +     * The restrictions.
 +     */
 +    private final SingleColumnRestrictions restrictions;
 +
 +    /**
 +     * <code>true</code> if the restrictions are corresponding to an EQ, <code>false</code> otherwise.
 +     */
 +    private boolean eq;
 +
 +    /**
 +     * <code>true</code> if the restrictions are corresponding to an IN, <code>false</code> otherwise.
 +     */
 +    private boolean in;
 +
 +    /**
 +     * <code>true</code> if the restrictions are corresponding to a Slice, <code>false</code> otherwise.
 +     */
 +    private boolean slice;
 +
 +    /**
 +     * <code>true</code> if the restrictions are corresponding to a Contains, <code>false</code> otherwise.
 +     */
 +    private boolean contains;
 +
 +    public SingleColumnPrimaryKeyRestrictions(CType ctype)
 +    {
 +        super(ctype);
 +        this.restrictions = new SingleColumnRestrictions();
 +        this.eq = true;
 +    }
 +
 +    private SingleColumnPrimaryKeyRestrictions(SingleColumnPrimaryKeyRestrictions primaryKeyRestrictions,
 +                                               SingleColumnRestriction restriction) throws InvalidRequestException
 +    {
 +        super(primaryKeyRestrictions.ctype);
 +        this.restrictions = primaryKeyRestrictions.restrictions.addRestriction(restriction);
 +
 +        if (!primaryKeyRestrictions.isEmpty())
 +        {
 +            ColumnDefinition lastColumn = primaryKeyRestrictions.restrictions.lastColumn();
 +            ColumnDefinition newColumn = restriction.getColumnDef();
 +
 +            checkFalse(primaryKeyRestrictions.isSlice() && newColumn.position() > lastColumn.position(),
 +                       "Clustering column \"%s\" cannot be restricted (preceding column \"%s\" is restricted by a non-EQ relation)",
 +                       newColumn.name,
 +                       lastColumn.name);
 +
 +            if (newColumn.position() < lastColumn.position())
 +                checkFalse(restriction.isSlice(),
 +                           "PRIMARY KEY column \"%s\" cannot be restricted (preceding column \"%s\" is restricted by a non-EQ relation)",
 +                           restrictions.nextColumn(newColumn).name,
 +                           newColumn.name);
 +        }
 +
 +        if (restriction.isSlice() || primaryKeyRestrictions.isSlice())
 +            this.slice = true;
 +        else if (restriction.isContains() || primaryKeyRestrictions.isContains())
 +            this.contains = true;
 +        else if (restriction.isIN())
 +            this.in = true;
 +        else
 +            this.eq = true;
 +    }
 +
 +    @Override
 +    public boolean isSlice()
 +    {
 +        return slice;
 +    }
 +
 +    @Override
 +    public boolean isEQ()
 +    {
 +        return eq;
 +    }
 +
 +    @Override
 +    public boolean isIN()
 +    {
 +        return in;
 +    }
 +
 +    @Override
 +    public boolean isOnToken()
 +    {
 +        return false;
 +    }
 +
 +    @Override
 +    public boolean isContains()
 +    {
 +        return contains;
 +    }
 +
 +    @Override
 +    public boolean isMultiColumn()
 +    {
 +        return false;
 +    }
 +
 +    @Override
 +    public boolean usesFunction(String ksName, String functionName)
 +    {
 +        return restrictions.usesFunction(ksName, functionName);
 +    }
 +
 +    @Override
 +    public PrimaryKeyRestrictions mergeWith(Restriction restriction) throws InvalidRequestException
 +    {
 +        if (restriction.isMultiColumn())
 +        {
 +            checkTrue(isEmpty(),
 +                      "Mixing single column relations and multi column relations on clustering columns is not allowed");
 +            return (PrimaryKeyRestrictions) restriction;
 +        }
 +
 +        if (restriction.isOnToken())
 +        {
 +            if (isEmpty())
 +                return (PrimaryKeyRestrictions) restriction;
 +
 +            return new TokenFilter(this, (TokenRestriction) restriction);
 +        }
 +
 +        return new SingleColumnPrimaryKeyRestrictions(this, (SingleColumnRestriction) restriction);
 +    }
 +
 +    @Override
 +    public List<Composite> valuesAsComposites(QueryOptions options) throws InvalidRequestException
 +    {
 +        CompositesBuilder builder = new CompositesBuilder(ctype.builder(), ctype);
 +        for (ColumnDefinition def : restrictions.getColumnDefs())
 +        {
 +            Restriction r = restrictions.getRestriction(def);
 +            assert !r.isSlice();
 +
 +            List<ByteBuffer> values = r.values(options);
 +
 +            if (values.isEmpty())
 +                return Collections.emptyList();
 +
 +            builder.addEachElementToAll(values);
 +            checkFalse(builder.containsNull(), "Invalid null value for column %s", def.name);
 +        }
 +
 +        return builder.build();
 +    }
 +
 +    @Override
 +    public List<Composite> boundsAsComposites(Bound bound, QueryOptions options) throws InvalidRequestException
 +    {
 +        CBuilder builder = ctype.builder();
 +        List<ColumnDefinition> defs = new ArrayList<>(restrictions.getColumnDefs());
 +
 +        CompositesBuilder compositeBuilder = new CompositesBuilder(builder, ctype);
 +        // The end-of-component of composite doesn't depend on whether the
 +        // component type is reversed or not (i.e. the ReversedType is applied
 +        // to the component comparator but not to the end-of-component itself),
 +        // it only depends on whether the slice is reversed
 +        int keyPosition = 0;
 +        for (ColumnDefinition def : defs)
 +        {
 +            // In a restriction, we always have Bound.START < Bound.END for the "base" comparator.
 +            // So if we're doing a reverse slice, we must inverse the bounds when giving them as start and end of the slice filter.
 +            // But if the actual comparator itself is reversed, we must inversed the bounds too.
 +            Bound b = !def.isReversedType() ? bound : bound.reverse();
 +            Restriction r = restrictions.getRestriction(def);
 +            if (keyPosition != def.position() || r.isContains())
 +                return compositeBuilder.buildWithEOC(bound.isEnd() ? EOC.END : EOC.START);
 +
 +            if (r.isSlice())
 +            {
 +                if (!r.hasBound(b))
 +                {
 +                    // There wasn't any non EQ relation on that key, we select all records having the preceding component as prefix.
 +                    // For composites, if there was preceding component and we're computing the end, we must change the last component
 +                    // End-Of-Component, otherwise we would be selecting only one record.
 +                    return compositeBuilder.buildWithEOC(bound.isEnd() ? EOC.END : EOC.START);
 +                }
 +
 +                ByteBuffer value = checkNotNull(r.bounds(b, options).get(0), "Invalid null clustering key part %s", r);
 +                compositeBuilder.addElementToAll(value);
 +                Composite.EOC eoc = eocFor(r, bound, b);
 +                return compositeBuilder.buildWithEOC(eoc);
 +            }
 +
 +            List<ByteBuffer> values = r.values(options);
 +
 +            if (values.isEmpty())
 +                return Collections.emptyList();
 +
 +            compositeBuilder.addEachElementToAll(values);
 +
 +            checkFalse(compositeBuilder.containsNull(), "Invalid null clustering key part %s", def.name);
 +            keyPosition++;
 +        }
 +        // Means no relation at all or everything was an equal
 +        // Note: if the builder is "full", there is no need to use the end-of-component bit. For columns selection,
 +        // it would be harmless to do it. However, we use this method got the partition key too. And when a query
 +        // with 2ndary index is done, and with the the partition provided with an EQ, we'll end up here, and in that
 +        // case using the eoc would be bad, since for the random partitioner we have no guarantee that
 +        // prefix.end() will sort after prefix (see #5240).
 +        EOC eoc = !compositeBuilder.hasRemaining() ? EOC.NONE : (bound.isEnd() ? EOC.END : EOC.START);
 +        return compositeBuilder.buildWithEOC(eoc);
 +    }
 +
 +    @Override
 +    public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
 +    {
 +        return Composites.toByteBuffers(valuesAsComposites(options));
 +    }
 +
 +    @Override
 +    public List<ByteBuffer> bounds(Bound b, QueryOptions options) throws InvalidRequestException
 +    {
 +        return Composites.toByteBuffers(boundsAsComposites(b, options));
 +    }
 +
 +    private static Composite.EOC eocFor(Restriction r, Bound eocBound, Bound inclusiveBound)
 +    {
 +        if (eocBound.isStart())
 +            return r.isInclusive(inclusiveBound) ? Composite.EOC.NONE : Composite.EOC.END;
 +
 +        return r.isInclusive(inclusiveBound) ? Composite.EOC.END : Composite.EOC.START;
 +    }
 +
 +    @Override
 +    public boolean hasBound(Bound b)
 +    {
 +        if (isEmpty())
 +            return false;
 +        return restrictions.lastRestriction().hasBound(b);
 +    }
 +
 +    @Override
 +    public boolean isInclusive(Bound b)
 +    {
 +        if (isEmpty())
 +            return false;
 +        return restrictions.lastRestriction().isInclusive(b);
 +    }
 +
 +    @Override
 +    public boolean hasSupportingIndex(SecondaryIndexManager indexManager)
 +    {
 +        return restrictions.hasSupportingIndex(indexManager);
 +    }
 +
 +    @Override
-     public void addIndexExpressionTo(List<IndexExpression> expressions, QueryOptions options) throws InvalidRequestException
++    public void addIndexExpressionTo(List<IndexExpression> expressions,
++                                     SecondaryIndexManager indexManager,
++                                     QueryOptions options) throws InvalidRequestException
 +    {
-         restrictions.addIndexExpressionTo(expressions, options);
++        Boolean clusteringColumns = null;
++        int position = 0;
++
++        for (ColumnDefinition columnDef : restrictions.getColumnDefs())
++        {
++            // SingleColumnPrimaryKeyRestrictions contains only one kind of column, either partition key or clustering columns.
++            // Therefore we only need to check the column kind once. All the other columns will be of the same kind.
++            if (clusteringColumns == null)
++                clusteringColumns = columnDef.isClusteringColumn() ? Boolean.TRUE : Boolean.FALSE;
++
++            Restriction restriction = restrictions.getRestriction(columnDef);
++
++            // We ignore all the clustering columns that can be handled by slices.
++            if (clusteringColumns && !restriction.isContains()&& position == columnDef.position())
++            {
++                position++;
++                if (!restriction.hasSupportingIndex(indexManager))
++                    continue;
++            }
++            restriction.addIndexExpressionTo(expressions, indexManager, options);
++        }
 +    }
 +
 +    @Override
 +    public Collection<ColumnDefinition> getColumnDefs()
 +    {
 +        return restrictions.getColumnDefs();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbc38cd3/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
index 4acd34b,0000000..9fbe462
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
@@@ -1,515 -1,0 +1,521 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.restrictions;
 +
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.List;
 +
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.AbstractMarker;
 +import org.apache.cassandra.cql3.Operator;
 +import org.apache.cassandra.cql3.QueryOptions;
 +import org.apache.cassandra.cql3.Term;
 +import org.apache.cassandra.cql3.statements.Bound;
 +import org.apache.cassandra.db.IndexExpression;
 +import org.apache.cassandra.db.index.SecondaryIndex;
 +import org.apache.cassandra.db.index.SecondaryIndexManager;
 +import org.apache.cassandra.db.marshal.CompositeType;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
 +
 +public abstract class SingleColumnRestriction extends AbstractRestriction
 +{
 +    /**
 +     * The definition of the column to which apply the restriction.
 +     */
 +    protected final ColumnDefinition columnDef;
 +
 +    public SingleColumnRestriction(ColumnDefinition columnDef)
 +    {
 +        this.columnDef = columnDef;
 +    }
 +
 +    /**
 +     * Returns the definition of the column to which is associated this restriction.
 +     * @return the definition of the column to which is associated this restriction
 +     */
 +    public ColumnDefinition getColumnDef()
 +    {
 +        return columnDef;
 +    }
 +
 +    @Override
 +    public void addIndexExpressionTo(List<IndexExpression> expressions,
++                                     SecondaryIndexManager indexManager,
 +                                     QueryOptions options) throws InvalidRequestException
 +    {
 +        List<ByteBuffer> values = values(options);
 +        checkTrue(values.size() == 1, "IN restrictions are not supported on indexed columns");
 +
 +        ByteBuffer value = validateIndexedValue(columnDef, values.get(0));
 +        expressions.add(new IndexExpression(columnDef.name.bytes, Operator.EQ, value));
 +    }
 +
 +    @Override
 +    public boolean hasSupportingIndex(SecondaryIndexManager indexManager)
 +    {
 +        SecondaryIndex index = indexManager.getIndexForColumn(columnDef.name.bytes);
 +        return index != null && isSupportedBy(index);
 +    }
 +
 +    /**
 +     * Check if this type of restriction is supported by the specified index.
 +     *
 +     * @param index the Secondary index
 +     * @return <code>true</code> this type of restriction is supported by the specified index,
 +     * <code>false</code> otherwise.
 +     */
 +    protected abstract boolean isSupportedBy(SecondaryIndex index);
 +
 +    public static final class EQ extends SingleColumnRestriction
 +    {
 +        private final Term value;
 +
 +        public EQ(ColumnDefinition columnDef, Term value)
 +        {
 +            super(columnDef);
 +            this.value = value;
 +        }
 +
 +        @Override
 +        public boolean usesFunction(String ksName, String functionName)
 +        {
 +            return usesFunction(value, ksName, functionName);
 +        }
 +
++        @Override
 +        public boolean isEQ()
 +        {
 +            return true;
 +        }
 +
 +        @Override
 +        public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
 +        {
 +            return Collections.singletonList(value.bindAndGet(options));
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("EQ(%s)", value);
 +        }
 +
 +        @Override
 +        public Restriction mergeWith(Restriction otherRestriction) throws InvalidRequestException
 +        {
 +            throw invalidRequest("%s cannot be restricted by more than one relation if it includes an Equal", columnDef.name);
 +        }
 +
 +        @Override
 +        protected boolean isSupportedBy(SecondaryIndex index)
 +        {
 +            return index.supportsOperator(Operator.EQ);
 +        }
 +    }
 +
 +    public static abstract class IN extends SingleColumnRestriction
 +    {
 +        public IN(ColumnDefinition columnDef)
 +        {
 +            super(columnDef);
 +        }
 +
 +        @Override
 +        public final boolean isIN()
 +        {
 +            return true;
 +        }
 +
 +        @Override
 +        public final Restriction mergeWith(Restriction otherRestriction) throws InvalidRequestException
 +        {
 +            throw invalidRequest("%s cannot be restricted by more than one relation if it includes a IN", columnDef.name);
 +        }
 +
 +        @Override
 +        protected final boolean isSupportedBy(SecondaryIndex index)
 +        {
 +            return index.supportsOperator(Operator.IN);
 +        }
 +    }
 +
 +    public static class InWithValues extends IN
 +    {
 +        protected final List<Term> values;
 +
 +        public InWithValues(ColumnDefinition columnDef, List<Term> values)
 +        {
 +            super(columnDef);
 +            this.values = values;
 +        }
 +
 +        @Override
 +        public boolean usesFunction(String ksName, String functionName)
 +        {
 +            return usesFunction(values, ksName, functionName);
 +        }
 +
 +        @Override
 +        public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
 +        {
 +            List<ByteBuffer> buffers = new ArrayList<>(values.size());
 +            for (Term value : values)
 +                buffers.add(value.bindAndGet(options));
 +            return buffers;
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("IN(%s)", values);
 +        }
 +    }
 +
 +    public static class InWithMarker extends IN
 +    {
 +        protected final AbstractMarker marker;
 +
 +        public InWithMarker(ColumnDefinition columnDef, AbstractMarker marker)
 +        {
 +            super(columnDef);
 +            this.marker = marker;
 +        }
 +
 +        @Override
 +        public boolean usesFunction(String ksName, String functionName)
 +        {
 +            return false;
 +        }
 +
++        @Override
 +        public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
 +        {
 +            Term.MultiItemTerminal lval = (Term.MultiItemTerminal) marker.bind(options);
 +            if (lval == null)
 +                throw new InvalidRequestException("Invalid null value for IN restriction");
 +            return lval.getElements();
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return "IN ?";
 +        }
 +    }
 +
 +    public static class Slice extends SingleColumnRestriction
 +    {
 +        private final TermSlice slice;
 +
 +        public Slice(ColumnDefinition columnDef, Bound bound, boolean inclusive, Term term)
 +        {
 +            super(columnDef);
 +            slice = TermSlice.newInstance(bound, inclusive, term);
 +        }
 +
 +        @Override
 +        public boolean usesFunction(String ksName, String functionName)
 +        {
 +            return (slice.hasBound(Bound.START) && usesFunction(slice.bound(Bound.START), ksName, functionName))
 +                    || (slice.hasBound(Bound.END) && usesFunction(slice.bound(Bound.END), ksName, functionName));
 +        }
 +
++        @Override
 +        public boolean isSlice()
 +        {
 +            return true;
 +        }
 +
 +        @Override
 +        public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
 +        public boolean hasBound(Bound b)
 +        {
 +            return slice.hasBound(b);
 +        }
 +
 +        @Override
 +        public List<ByteBuffer> bounds(Bound b, QueryOptions options) throws InvalidRequestException
 +        {
 +            return Collections.singletonList(slice.bound(b).bindAndGet(options));
 +        }
 +
 +        @Override
 +        public boolean isInclusive(Bound b)
 +        {
 +            return slice.isInclusive(b);
 +        }
 +
 +        @Override
 +        public Restriction mergeWith(Restriction otherRestriction)
 +        throws InvalidRequestException
 +        {
 +            checkTrue(otherRestriction.isSlice(),
 +                      "Column \"%s\" cannot be restricted by both an equality and an inequality relation",
 +                      columnDef.name);
 +
 +            SingleColumnRestriction.Slice otherSlice = (SingleColumnRestriction.Slice) otherRestriction;
 +
 +            checkFalse(hasBound(Bound.START) && otherSlice.hasBound(Bound.START),
 +                       "More than one restriction was found for the start bound on %s", columnDef.name);
 +
 +            checkFalse(hasBound(Bound.END) && otherSlice.hasBound(Bound.END),
 +                       "More than one restriction was found for the end bound on %s", columnDef.name);
 +
 +            return new Slice(columnDef,  slice.merge(otherSlice.slice));
 +        }
 +
 +        @Override
 +        public void addIndexExpressionTo(List<IndexExpression> expressions,
++                                         SecondaryIndexManager indexManager,
 +                                         QueryOptions options) throws InvalidRequestException
 +        {
 +            for (Bound b : Bound.values())
 +            {
 +                if (hasBound(b))
 +                {
 +                    ByteBuffer value = validateIndexedValue(columnDef, slice.bound(b).bindAndGet(options));
 +                    Operator op = slice.getIndexOperator(b);
 +                    // If the underlying comparator for name is reversed, we need to reverse the IndexOperator: user operation
 +                    // always refer to the "forward" sorting even if the clustering order is reversed, but the 2ndary code does
 +                    // use the underlying comparator as is.
 +                    op = columnDef.isReversedType() ? op.reverse() : op;
 +                    expressions.add(new IndexExpression(columnDef.name.bytes, op, value));
 +                }
 +            }
 +        }
 +
 +        @Override
 +        protected boolean isSupportedBy(SecondaryIndex index)
 +        {
 +            return slice.isSupportedBy(index);
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("SLICE%s", slice);
 +        }
 +
 +        private Slice(ColumnDefinition columnDef, TermSlice slice)
 +        {
 +            super(columnDef);
 +            this.slice = slice;
 +        }
 +    }
 +
 +    // This holds CONTAINS, CONTAINS_KEY, and map[key] = value restrictions because we might want to have any combination of them.
 +    public static final class Contains extends SingleColumnRestriction
 +    {
 +        private List<Term> values = new ArrayList<>(); // for CONTAINS
 +        private List<Term> keys = new ArrayList<>(); // for CONTAINS_KEY
 +        private List<Term> entryKeys = new ArrayList<>(); // for map[key] = value
 +        private List<Term> entryValues = new ArrayList<>(); // for map[key] = value
 +
 +        public Contains(ColumnDefinition columnDef, Term t, boolean isKey)
 +        {
 +            super(columnDef);
 +            if (isKey)
 +                keys.add(t);
 +            else
 +                values.add(t);
 +        }
 +
 +        public Contains(ColumnDefinition columnDef, Term mapKey, Term mapValue)
 +        {
 +            super(columnDef);
 +            entryKeys.add(mapKey);
 +            entryValues.add(mapValue);
 +        }
 +
 +        @Override
 +        public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
 +        {
 +            return bindAndGet(values, options);
 +        }
 +
 +        @Override
 +        public boolean isContains()
 +        {
 +            return true;
 +        }
 +
 +        @Override
 +        public Restriction mergeWith(Restriction otherRestriction) throws InvalidRequestException
 +        {
 +            checkTrue(otherRestriction.isContains(),
 +                      "Collection column %s can only be restricted by CONTAINS, CONTAINS KEY, or map-entry equality",
 +                      getColumnDef().name);
 +
 +            SingleColumnRestriction.Contains newContains = new Contains(getColumnDef());
 +
 +            copyKeysAndValues(this, newContains);
 +            copyKeysAndValues((Contains) otherRestriction, newContains);
 +
 +            return newContains;
 +        }
 +
 +        @Override
 +        public void addIndexExpressionTo(List<IndexExpression> expressions,
++                                         SecondaryIndexManager indexManager,
 +                                         QueryOptions options)
 +                                         throws InvalidRequestException
 +        {
 +            addExpressionsFor(expressions, values(options), Operator.CONTAINS);
 +            addExpressionsFor(expressions, keys(options), Operator.CONTAINS_KEY);
 +            addExpressionsFor(expressions, entries(options), Operator.EQ);
 +        }
 +
 +        private void addExpressionsFor(List<IndexExpression> target, List<ByteBuffer> values,
 +                                       Operator op) throws InvalidRequestException
 +        {
 +            for (ByteBuffer value : values)
 +            {
 +                validateIndexedValue(columnDef, value);
 +                target.add(new IndexExpression(columnDef.name.bytes, op, value));
 +            }
 +        }
 +
 +        @Override
 +        protected boolean isSupportedBy(SecondaryIndex index)
 +        {
 +            boolean supported = false;
 +
 +            if (numberOfValues() > 0)
 +                supported |= index.supportsOperator(Operator.CONTAINS);
 +
 +            if (numberOfKeys() > 0)
 +                supported |= index.supportsOperator(Operator.CONTAINS_KEY);
 +
 +            if (numberOfEntries() > 0)
 +                supported |= index.supportsOperator(Operator.EQ);
 +
 +            return supported;
 +        }
 +
 +        public int numberOfValues()
 +        {
 +            return values.size();
 +        }
 +
 +        public int numberOfKeys()
 +        {
 +            return keys.size();
 +        }
 +
 +        public int numberOfEntries()
 +        {
 +            return entryKeys.size();
 +        }
 +
 +        @Override
 +        public boolean usesFunction(String ksName, String functionName)
 +        {
 +            return usesFunction(values, ksName, functionName) || usesFunction(keys, ksName, functionName) ||
 +                   usesFunction(entryKeys, ksName, functionName) || usesFunction(entryValues, ksName, functionName);
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("CONTAINS(values=%s, keys=%s, entryKeys=%s, entryValues=%s)", values, keys, entryKeys, entryValues);
 +        }
 +
 +        @Override
 +        public boolean hasBound(Bound b)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
 +        public List<ByteBuffer> bounds(Bound b, QueryOptions options) throws InvalidRequestException
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
 +        public boolean isInclusive(Bound b)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        private List<ByteBuffer> keys(QueryOptions options) throws InvalidRequestException
 +        {
 +            return bindAndGet(keys, options);
 +        }
 +
 +        private List<ByteBuffer> entries(QueryOptions options) throws InvalidRequestException
 +        {
 +            List<ByteBuffer> entryBuffers = new ArrayList<>(entryKeys.size());
 +            List<ByteBuffer> keyBuffers = bindAndGet(entryKeys, options);
 +            List<ByteBuffer> valueBuffers = bindAndGet(entryValues, options);
 +            for (int i = 0; i < entryKeys.size(); i++)
 +            {
 +                if (valueBuffers.get(i) == null)
 +                    throw new InvalidRequestException("Unsupported null value for map-entry equality");
 +                entryBuffers.add(CompositeType.build(keyBuffers.get(i), valueBuffers.get(i)));
 +            }
 +            return entryBuffers;
 +        }
 +
 +        /**
 +         * Binds the query options to the specified terms and returns the resulting values.
 +         *
 +         * @param terms the terms
 +         * @param options the query options
 +         * @return the value resulting from binding the query options to the specified terms
 +         * @throws InvalidRequestException if a problem occurs while binding the query options
 +         */
 +        private static List<ByteBuffer> bindAndGet(List<Term> terms, QueryOptions options) throws InvalidRequestException
 +        {
 +            List<ByteBuffer> buffers = new ArrayList<>(terms.size());
 +            for (Term value : terms)
 +                buffers.add(value.bindAndGet(options));
 +            return buffers;
 +        }
 +
 +        /**
 +         * Copies the keys and value from the first <code>Contains</code> to the second one.
 +         *
 +         * @param from the <code>Contains</code> to copy from
 +         * @param to the <code>Contains</code> to copy to
 +         */
 +        private static void copyKeysAndValues(Contains from, Contains to)
 +        {
 +            to.values.addAll(from.values);
 +            to.keys.addAll(from.keys);
 +            to.entryKeys.addAll(from.entryKeys);
 +            to.entryValues.addAll(from.entryValues);
 +        }
 +
 +        private Contains(ColumnDefinition columnDef)
 +        {
 +            super(columnDef);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbc38cd3/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestrictions.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestrictions.java
index b9ffc68,0000000..b85e932
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestrictions.java
@@@ -1,209 -1,0 +1,210 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.restrictions;
 +
 +import java.util.*;
 +
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.QueryOptions;
 +import org.apache.cassandra.cql3.restrictions.SingleColumnRestriction.Contains;
 +import org.apache.cassandra.db.IndexExpression;
 +import org.apache.cassandra.db.index.SecondaryIndexManager;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +
 +/**
 + * Sets of single column restrictions.
 + */
 +final class SingleColumnRestrictions implements Restrictions
 +{
 +    /**
 +     * The comparator used to sort the <code>Restriction</code>s.
 +     */
 +    private static final Comparator<ColumnDefinition> COLUMN_DEFINITION_COMPARATOR = new Comparator<ColumnDefinition>()
 +    {
 +        @Override
 +        public int compare(ColumnDefinition column, ColumnDefinition otherColumn)
 +        {
 +            int value = Integer.compare(column.position(), otherColumn.position());
 +            return value != 0 ? value : column.name.bytes.compareTo(otherColumn.name.bytes);
 +        }
 +    };
 +
 +    /**
 +     * The restrictions per column.
 +     */
 +    protected final TreeMap<ColumnDefinition, Restriction> restrictions;
 +
 +    public SingleColumnRestrictions()
 +    {
 +        this(new TreeMap<ColumnDefinition, Restriction>(COLUMN_DEFINITION_COMPARATOR));
 +    }
 +
 +    protected SingleColumnRestrictions(TreeMap<ColumnDefinition, Restriction> restrictions)
 +    {
 +        this.restrictions = restrictions;
 +    }
 +
 +    @Override
 +    public final void addIndexExpressionTo(List<IndexExpression> expressions,
++                                           SecondaryIndexManager indexManager,
 +                                           QueryOptions options) throws InvalidRequestException
 +    {
 +        for (Restriction restriction : restrictions.values())
-             restriction.addIndexExpressionTo(expressions, options);
++            restriction.addIndexExpressionTo(expressions, indexManager, options);
 +    }
 +
 +    @Override
 +    public final Set<ColumnDefinition> getColumnDefs()
 +    {
 +        return restrictions.keySet();
 +    }
 +
 +    /**
 +     * Returns the restriction associated to the specified column.
 +     *
 +     * @param columnDef the column definition
 +     * @return the restriction associated to the specified column
 +     */
 +    public Restriction getRestriction(ColumnDefinition columnDef)
 +    {
 +        return restrictions.get(columnDef);
 +    }
 +
 +    @Override
 +    public boolean usesFunction(String ksName, String functionName)
 +    {
 +        for (Restriction restriction : restrictions.values())
 +            if (restriction.usesFunction(ksName, functionName))
 +                return true;
 +
 +        return false;
 +    }
 +
 +    @Override
 +    public final boolean isEmpty()
 +    {
 +        return getColumnDefs().isEmpty();
 +    }
 +
 +    @Override
 +    public final int size()
 +    {
 +        return getColumnDefs().size();
 +    }
 +
 +    /**
 +     * Adds the specified restriction to this set of restrictions.
 +     *
 +     * @param restriction the restriction to add
 +     * @return the new set of restrictions
 +     * @throws InvalidRequestException if the new restriction cannot be added
 +     */
 +    public SingleColumnRestrictions addRestriction(SingleColumnRestriction restriction) throws InvalidRequestException
 +    {
 +        TreeMap<ColumnDefinition, Restriction> newRestrictions = new TreeMap<>(this.restrictions);
 +        return new SingleColumnRestrictions(mergeRestrictions(newRestrictions, restriction));
 +    }
 +
 +    private static TreeMap<ColumnDefinition, Restriction> mergeRestrictions(TreeMap<ColumnDefinition, Restriction> restrictions,
 +                                                                            Restriction restriction)
 +                                                                            throws InvalidRequestException
 +    {
 +        ColumnDefinition def = ((SingleColumnRestriction) restriction).getColumnDef();
 +        Restriction existing = restrictions.get(def);
 +        Restriction newRestriction = mergeRestrictions(existing, restriction);
 +        restrictions.put(def, newRestriction);
 +        return restrictions;
 +    }
 +
 +    @Override
 +    public final boolean hasSupportingIndex(SecondaryIndexManager indexManager)
 +    {
 +        for (Restriction restriction : restrictions.values())
 +        {
 +            if (restriction.hasSupportingIndex(indexManager))
 +                return true;
 +        }
 +        return false;
 +    }
 +
 +    /**
 +     * Returns the column after the specified one.
 +     *
 +     * @param columnDef the column for which the next one need to be found
 +     * @return the column after the specified one.
 +     */
 +    ColumnDefinition nextColumn(ColumnDefinition columnDef)
 +    {
 +        return restrictions.tailMap(columnDef, false).firstKey();
 +    }
 +
 +    /**
 +     * Returns the definition of the last column.
 +     *
 +     * @return the definition of the last column.
 +     */
 +    ColumnDefinition lastColumn()
 +    {
 +        return isEmpty() ? null : this.restrictions.lastKey();
 +    }
 +
 +    /**
 +     * Returns the last restriction.
 +     *
 +     * @return the last restriction.
 +     */
 +    Restriction lastRestriction()
 +    {
 +        return isEmpty() ? null : this.restrictions.lastEntry().getValue();
 +    }
 +
 +    /**
 +     * Merges the two specified restrictions.
 +     *
 +     * @param restriction the first restriction
 +     * @param otherRestriction the second restriction
 +     * @return the merged restriction
 +     * @throws InvalidRequestException if the two restrictions cannot be merged
 +     */
 +    private static Restriction mergeRestrictions(Restriction restriction,
 +                                                 Restriction otherRestriction) throws InvalidRequestException
 +    {
 +        return restriction == null ? otherRestriction
 +                                   : restriction.mergeWith(otherRestriction);
 +    }
 +
 +    /**
 +     * Checks if the restrictions contains multiple contains, contains key, or map[key] = value.
 +     *
 +     * @return <code>true</code> if the restrictions contains multiple contains, contains key, or ,
 +     * map[key] = value; <code>false</code> otherwise
 +     */
 +    public final boolean hasMultipleContains()
 +    {
 +        int numberOfContains = 0;
 +        for (Restriction restriction : restrictions.values())
 +        {
 +            if (restriction.isContains())
 +            {
 +                Contains contains = (Contains) restriction;
 +                numberOfContains += (contains.numberOfValues() + contains.numberOfKeys() + contains.numberOfEntries());
 +            }
 +        }
 +        return numberOfContains > 1;
 +    }
 +}