You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2014/12/02 20:09:06 UTC

[2/5] cassandra git commit: Refactor SelectStatement and Restrictions

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65a7088e/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 3360d40..022105c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -20,41 +20,50 @@ package org.apache.cassandra.cql3.statements;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.base.Predicate;
-import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 
 import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.cql3.statements.SingleColumnRestriction.Contains;
+import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
 import org.apache.cassandra.cql3.selection.RawSelector;
 import org.apache.cassandra.cql3.selection.Selection;
-import org.apache.cassandra.db.composites.*;
-import org.apache.cassandra.db.composites.Composite.EOC;
-import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.Composites;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.dht.*;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.pager.*;
-import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.service.pager.Pageable;
+import org.apache.cassandra.service.pager.QueryPager;
+import org.apache.cassandra.service.pager.QueryPagers;
 import org.apache.cassandra.thrift.ThriftValidation;
-import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
+import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
+
 /**
  * Encapsulates a completely parsed SELECT query, including the target
  * column family, expression, result count, and ordering clause.
@@ -70,96 +79,43 @@ public class SelectStatement implements CQLStatement
     private final Selection selection;
     private final Term limit;
 
-    /** Restrictions on partitioning columns */
-    private final Restriction[] keyRestrictions;
-
-    /** Restrictions on clustering columns */
-    private final Restriction[] columnRestrictions;
-
-    /** Restrictions on non-primary key columns (i.e. secondary index restrictions) */
-    private final Map<ColumnIdentifier, Restriction> metadataRestrictions = new HashMap<ColumnIdentifier, Restriction>();
-
-    // All restricted columns not covered by the key or index filter
-    private final Set<ColumnDefinition> restrictedColumns = new HashSet<ColumnDefinition>();
-    private Restriction.Slice sliceRestriction;
-
-    private boolean isReversed;
-    private boolean onToken;
-    private boolean isKeyRange;
-    private boolean keyIsInRelation;
-    private boolean usesSecondaryIndexing;
+    private final StatementRestrictions restrictions;
 
-    private Map<ColumnIdentifier, Integer> orderingIndexes;
+    private final boolean isReversed;
 
-    private boolean selectsStaticColumns;
-    private boolean selectsOnlyStaticColumns;
+    /**
+     * The comparator used to orders results when multiple keys are selected (using IN).
+     */
+    private final Comparator<List<ByteBuffer>> orderingComparator;
 
     // Used by forSelection below
     private static final Parameters defaultParameters = new Parameters(Collections.<ColumnIdentifier.Raw, Boolean>emptyMap(), false, false);
 
-    private static final Predicate<ColumnDefinition> isStaticFilter = new Predicate<ColumnDefinition>()
-    {
-        public boolean apply(ColumnDefinition def)
-        {
-            return def.isStatic();
-        }
-    };
-
-    public SelectStatement(CFMetaData cfm, int boundTerms, Parameters parameters, Selection selection, Term limit)
+    public SelectStatement(CFMetaData cfm,
+                           int boundTerms,
+                           Parameters parameters,
+                           Selection selection,
+                           StatementRestrictions restrictions,
+                           boolean isReversed,
+                           Comparator<List<ByteBuffer>> orderingComparator,
+                           Term limit)
     {
         this.cfm = cfm;
         this.boundTerms = boundTerms;
         this.selection = selection;
-        this.keyRestrictions = new Restriction[cfm.partitionKeyColumns().size()];
-        this.columnRestrictions = new Restriction[cfm.clusteringColumns().size()];
+        this.restrictions = restrictions;
+        this.isReversed = isReversed;
+        this.orderingComparator = orderingComparator;
         this.parameters = parameters;
         this.limit = limit;
-
-        // Now gather a few info on whether we should bother with static columns or not for this statement
-        initStaticColumnsInfo();
     }
 
+    @Override
     public boolean usesFunction(String ksName, String functionName)
     {
-        if (selection.usesFunction(ksName, functionName))
-            return true;
-        if (limit != null && limit.usesFunction(ksName, functionName))
-            return true;
-        for (Restriction restriction : metadataRestrictions.values())
-            if (restriction != null && restriction.usesFunction(ksName, functionName))
-                return true;
-        for (Restriction restriction : keyRestrictions)
-            if (restriction != null && restriction.usesFunction(ksName, functionName))
-                return true;
-        for (Restriction restriction : columnRestrictions)
-            if (restriction != null && restriction.usesFunction(ksName, functionName))
-                return true;
-        return false;
-    }
-
-    private void initStaticColumnsInfo()
-    {
-        if (!cfm.hasStaticColumns())
-            return;
-
-        // If it's a wildcard, we do select static but not only them
-        if (selection.isWildcard())
-        {
-            selectsStaticColumns = true;
-            return;
-        }
-
-        // Otherwise, check the selected columns
-        selectsStaticColumns = !Iterables.isEmpty(Iterables.filter(selection.getColumns(), isStaticFilter));
-        selectsOnlyStaticColumns = true;
-        for (ColumnDefinition def : selection.getColumns())
-        {
-            if (def.kind != ColumnDefinition.Kind.PARTITION_KEY && def.kind != ColumnDefinition.Kind.STATIC)
-            {
-                selectsOnlyStaticColumns = false;
-                break;
-            }
-        }
+        return selection.usesFunction(ksName, functionName)
+                || restrictions.usesFunction(ksName, functionName)
+                || (limit != null && limit.usesFunction(ksName, functionName));
     }
 
     // Creates a simple select based on the given selection.
@@ -167,7 +123,14 @@ public class SelectStatement implements CQLStatement
     // queried data through processColumnFamily.
     static SelectStatement forSelection(CFMetaData cfm, Selection selection)
     {
-        return new SelectStatement(cfm, 0, defaultParameters, selection, null);
+        return new SelectStatement(cfm,
+                                   0,
+                                   defaultParameters,
+                                   selection,
+                                   StatementRestrictions.empty(cfm),
+                                   false,
+                                   null,
+                                   null);
     }
 
     public ResultSet.Metadata getResultMetadata()
@@ -193,8 +156,7 @@ public class SelectStatement implements CQLStatement
     public ResultMessage.Rows execute(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
     {
         ConsistencyLevel cl = options.getConsistency();
-        if (cl == null)
-            throw new InvalidRequestException("Invalid empty consistency level");
+        checkNotNull(cl, "Invalid empty consistency level");
 
         cl.validateForRead(keyspace());
 
@@ -216,13 +178,14 @@ public class SelectStatement implements CQLStatement
         }
 
         QueryPager pager = QueryPagers.pager(command, cl, state.getClientState(), options.getPagingState());
+
         if (selection.isAggregate())
             return pageAggregateQuery(pager, options, pageSize, now);
 
         // We can't properly do post-query ordering if we page (see #6722)
-        if (needsPostQueryOrdering())
-            throw new InvalidRequestException("Cannot page queries with both ORDER BY and a IN restriction on the partition key; you must either remove the "
-                                            + "ORDER BY or the IN and sort client side, or disable paging for this query");
+        checkFalse(needsPostQueryOrdering(),
+                  "Cannot page queries with both ORDER BY and a IN restriction on the partition key;"
+                  + " you must either remove the ORDER BY or the IN and sort client side, or disable paging for this query");
 
         List<Row> page = pager.fetchPage(pageSize);
         ResultMessage.Rows msg = processResults(page, options, limit, now);
@@ -236,7 +199,7 @@ public class SelectStatement implements CQLStatement
     private Pageable getPageableCommand(QueryOptions options, int limit, long now) throws RequestValidationException
     {
         int limitForQuery = updateLimitForQuery(limit);
-        if (isKeyRange || usesSecondaryIndexing)
+        if (restrictions.isKeyRange() || restrictions.usesSecondaryIndexing())
             return getRangeCommand(options, limitForQuery, now);
 
         List<ReadCommand> commands = getSliceCommands(options, limitForQuery, now);
@@ -330,9 +293,7 @@ public class SelectStatement implements CQLStatement
 
     private List<ReadCommand> getSliceCommands(QueryOptions options, int limit, long now) throws RequestValidationException
     {
-        Collection<ByteBuffer> keys = getKeys(options);
-        if (keys.isEmpty()) // in case of IN () for (the last column of) the partition key.
-            return null;
+        Collection<ByteBuffer> keys = restrictions.getPartitionKeys(options);
 
         List<ReadCommand> commands = new ArrayList<>(keys.size());
 
@@ -363,69 +324,12 @@ public class SelectStatement implements CQLStatement
         List<IndexExpression> expressions = getValidatedIndexExpressions(options);
         // The LIMIT provided by the user is the number of CQL row he wants returned.
         // We want to have getRangeSlice to count the number of columns, not the number of keys.
-        AbstractBounds<RowPosition> keyBounds = getKeyBounds(options);
+        AbstractBounds<RowPosition> keyBounds = restrictions.getPartitionKeyBounds(options);
         return keyBounds == null
              ? null
              : new RangeSliceCommand(keyspace(), columnFamily(), now,  filter, keyBounds, expressions, limit, !parameters.isDistinct, false);
     }
 
-    private AbstractBounds<RowPosition> getKeyBounds(QueryOptions options) throws InvalidRequestException
-    {
-        IPartitioner p = StorageService.getPartitioner();
-
-        if (onToken)
-        {
-            Token startToken = getTokenBound(Bound.START, options, p);
-            Token endToken = getTokenBound(Bound.END, options, p);
-
-            boolean includeStart = includeKeyBound(Bound.START);
-            boolean includeEnd = includeKeyBound(Bound.END);
-
-            /*
-             * If we ask SP.getRangeSlice() for (token(200), token(200)], it will happily return the whole ring.
-             * However, wrapping range doesn't really make sense for CQL, and we want to return an empty result
-             * in that case (CASSANDRA-5573). So special case to create a range that is guaranteed to be empty.
-             *
-             * In practice, we want to return an empty result set if either startToken > endToken, or both are
-             * equal but one of the bound is excluded (since [a, a] can contains something, but not (a, a], [a, a)
-             * or (a, a)). Note though that in the case where startToken or endToken is the minimum token, then
-             * this special case rule should not apply.
-             */
-            int cmp = startToken.compareTo(endToken);
-            if (!startToken.isMinimum() && !endToken.isMinimum() && (cmp > 0 || (cmp == 0 && (!includeStart || !includeEnd))))
-                return null;
-
-            RowPosition start = includeStart ? startToken.minKeyBound() : startToken.maxKeyBound();
-            RowPosition end = includeEnd ? endToken.maxKeyBound() : endToken.minKeyBound();
-
-            return new Range<RowPosition>(start, end);
-        }
-        else
-        {
-            ByteBuffer startKeyBytes = getKeyBound(Bound.START, options);
-            ByteBuffer finishKeyBytes = getKeyBound(Bound.END, options);
-
-            RowPosition startKey = RowPosition.ForKey.get(startKeyBytes, p);
-            RowPosition finishKey = RowPosition.ForKey.get(finishKeyBytes, p);
-
-            if (startKey.compareTo(finishKey) > 0 && !finishKey.isMinimum())
-                return null;
-
-            if (includeKeyBound(Bound.START))
-            {
-                return includeKeyBound(Bound.END)
-                     ? new Bounds<RowPosition>(startKey, finishKey)
-                     : new IncludingExcludingBounds<RowPosition>(startKey, finishKey);
-            }
-            else
-            {
-                return includeKeyBound(Bound.END)
-                     ? new Range<RowPosition>(startKey, finishKey)
-                     : new ExcludingBounds<RowPosition>(startKey, finishKey);
-            }
-        }
-    }
-
     private ColumnSlice makeStaticSlice()
     {
         // Note: we could use staticPrefix.start() for the start bound, but EMPTY gives us the
@@ -444,18 +348,18 @@ public class SelectStatement implements CQLStatement
             // For distinct, we only care about fetching the beginning of each partition. If we don't have
             // static columns, we in fact only care about the first cell, so we query only that (we don't "group").
             // If we do have static columns, we do need to fetch the first full group (to have the static columns values).
-            return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, 1, selectsStaticColumns ? toGroup : -1);
+            return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, 1, selection.containsStaticColumns() ? toGroup : -1);
         }
-        else if (isColumnRange())
+        else if (restrictions.isColumnRange())
         {
-            List<Composite> startBounds = getRequestedBound(Bound.START, options);
-            List<Composite> endBounds = getRequestedBound(Bound.END, options);
+            List<Composite> startBounds = restrictions.getClusteringColumnsBoundsAsComposites(Bound.START, options);
+            List<Composite> endBounds = restrictions.getClusteringColumnsBoundsAsComposites(Bound.END, options);
             assert startBounds.size() == endBounds.size();
 
             // Handles fetching static columns. Note that for 2i, the filter is just used to restrict
             // the part of the index to query so adding the static slice would be useless and confusing.
             // For 2i, static columns are retrieve in CompositesSearcher with each index hit.
-            ColumnSlice staticSlice = selectsStaticColumns && !usesSecondaryIndexing
+            ColumnSlice staticSlice = selection.containsStaticColumns() && !restrictions.usesSecondaryIndexing()
                                     ? makeStaticSlice()
                                     : null;
 
@@ -550,187 +454,41 @@ public class SelectStatement implements CQLStatement
 
     private int getLimit(QueryOptions options) throws InvalidRequestException
     {
-        int l = Integer.MAX_VALUE;
         if (limit != null)
         {
-            ByteBuffer b = limit.bindAndGet(options);
-            if (b == null)
-                throw new InvalidRequestException("Invalid null value of limit");
+            ByteBuffer b = checkNotNull(limit.bindAndGet(options), "Invalid null value of limit");
 
             try
             {
                 Int32Type.instance.validate(b);
-                l = Int32Type.instance.compose(b);
+                int l = Int32Type.instance.compose(b);
+                checkTrue(l > 0, "LIMIT must be strictly positive");
+                return l;
             }
             catch (MarshalException e)
             {
                 throw new InvalidRequestException("Invalid limit value");
             }
         }
-
-        if (l <= 0)
-            throw new InvalidRequestException("LIMIT must be strictly positive");
-
-        return l;
+        return Integer.MAX_VALUE;
     }
 
     private int updateLimitForQuery(int limit)
     {
         // Internally, we don't support exclusive bounds for slices. Instead, we query one more element if necessary
         // and exclude it later (in processColumnFamily)
-        return sliceRestriction != null && (!sliceRestriction.isInclusive(Bound.START) || !sliceRestriction.isInclusive(Bound.END)) && limit != Integer.MAX_VALUE
+        return restrictions.isNonCompositeSliceWithExclusiveBounds() && limit != Integer.MAX_VALUE
              ? limit + 1
              : limit;
     }
 
-    private Collection<ByteBuffer> getKeys(final QueryOptions options) throws InvalidRequestException
-    {
-        List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
-        CBuilder builder = cfm.getKeyValidatorAsCType().builder();
-        for (ColumnDefinition def : cfm.partitionKeyColumns())
-        {
-            Restriction r = keyRestrictions[def.position()];
-            assert r != null && !r.isSlice();
-
-            List<ByteBuffer> values = r.values(options);
-
-            if (builder.remainingCount() == 1)
-            {
-                for (ByteBuffer val : values)
-                {
-                    if (val == null)
-                        throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", def.name));
-                    keys.add(builder.buildWith(val).toByteBuffer());
-                }
-            }
-            else
-            {
-                // Note: for backward compatibility reasons, we let INs with 1 value slide
-                if (values.size() != 1)
-                    throw new InvalidRequestException("IN is only supported on the last column of the partition key");
-                ByteBuffer val = values.get(0);
-                if (val == null)
-                    throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", def.name));
-                builder.add(val);
-            }
-        }
-        return keys;
-    }
-
-    private ByteBuffer getKeyBound(Bound b, QueryOptions options) throws InvalidRequestException
-    {
-        // Deal with unrestricted partition key components (special-casing is required to deal with 2i queries on the first
-        // component of a composite partition key).
-        for (int i = 0; i < keyRestrictions.length; i++)
-            if (keyRestrictions[i] == null)
-                return ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
-        // We deal with IN queries for keys in other places, so we know buildBound will return only one result
-        return buildBound(b, cfm.partitionKeyColumns(), keyRestrictions, false, cfm.getKeyValidatorAsCType(), options).get(0).toByteBuffer();
-    }
-
-    private Token getTokenBound(Bound b, QueryOptions options, IPartitioner p) throws InvalidRequestException
-    {
-        assert onToken;
-
-        Restriction restriction = keyRestrictions[0];
-
-        assert !restriction.isMultiColumn() : "Unexpectedly got a multi-column restriction on a partition key for a range query";
-        SingleColumnRestriction keyRestriction = (SingleColumnRestriction)restriction;
-
-        ByteBuffer value;
-        if (keyRestriction.isEQ())
-        {
-            value = keyRestriction.values(options).get(0);
-        }
-        else
-        {
-            SingleColumnRestriction.Slice slice = (SingleColumnRestriction.Slice)keyRestriction;
-            if (!slice.hasBound(b))
-                return p.getMinimumToken();
-
-            value = slice.bound(b, options);
-        }
-
-        if (value == null)
-            throw new InvalidRequestException("Invalid null token value");
-        return p.getTokenFactory().fromByteArray(value);
-    }
-
-    private boolean includeKeyBound(Bound b)
-    {
-        for (Restriction r : keyRestrictions)
-        {
-            if (r == null)
-                return true;
-            else if (r.isSlice())
-            {
-                assert !r.isMultiColumn() : "Unexpectedly got multi-column restriction on partition key";
-                return ((SingleColumnRestriction.Slice)r).isInclusive(b);
-            }
-        }
-        // All equality
-        return true;
-    }
-
-    private boolean isColumnRange()
-    {
-        // Due to CASSANDRA-5762, we always do a slice for CQL3 tables (not dense, composite).
-        // Static CF (non dense but non composite) never entails a column slice however
-        if (!cfm.comparator.isDense())
-            return cfm.comparator.isCompound();
-
-        // Otherwise (i.e. for compact table where we don't have a row marker anyway and thus don't care about CASSANDRA-5762),
-        // it is a range query if it has at least one the column alias for which no relation is defined or is not EQ.
-        for (Restriction r : columnRestrictions)
-        {
-            if (r == null || r.isSlice())
-                return true;
-        }
-        return false;
-    }
-
     private SortedSet<CellName> getRequestedColumns(QueryOptions options) throws InvalidRequestException
     {
         // Note: getRequestedColumns don't handle static columns, but due to CASSANDRA-5762
         // we always do a slice for CQL3 tables, so it's ok to ignore them here
-        assert !isColumnRange();
-
-        CompositesBuilder builder = new CompositesBuilder(cfm.comparator.prefixBuilder(), cfm.comparator);
-        Iterator<ColumnDefinition> idIter = cfm.clusteringColumns().iterator();
-        for (int i = 0; i < columnRestrictions.length; i++)
-        {
-            Restriction r = columnRestrictions[i];
-            ColumnDefinition def = idIter.next();
-            assert r != null && !r.isSlice();
-
-            if (r.isEQ() || !r.isMultiColumn())
-            {
-                List<ByteBuffer> values = r.values(options);
-                if (values.isEmpty())
-                    return null;
-                builder.addEachElementToAll(values);
-            }
-            else
-            {
-                // we have a multi-column IN restriction
-                List<List<ByteBuffer>> splitValues = ((MultiColumnRestriction.IN) r).splitValues(options);
-                if (splitValues.isEmpty())
-                    return null;
-
-                builder.addAllElementsToAll(splitValues);
-
-                // increment i to skip the remainder of the multicolumn restriction
-                i += splitValues.get(0).size() - 1;
-            }
-
-            if (builder.containsNull())
-                throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s",
-                        def.name));
-        }
-
-        SortedSet<CellName> columns = new TreeSet<>(cfm.comparator);
-        for (Composite composite : builder.build())
+        assert !restrictions.isColumnRange();
+        SortedSet<CellName> columns = new TreeSet<CellName>(cfm.comparator);
+        for (Composite composite : restrictions.getClusteringColumnsAsComposites(options))
             columns.addAll(addSelectedColumns(composite));
         return columns;
     }
@@ -743,10 +501,6 @@ public class SelectStatement implements CQLStatement
         }
         else
         {
-            // Collections require doing a slice query because a given collection is a
-            // non-know set of columns, so we shouldn't get there
-            assert !selectACollection();
-
             SortedSet<CellName> columns = new TreeSet<CellName>(cfm.comparator);
 
             // We need to query the selected column as well as the marker
@@ -760,7 +514,7 @@ public class SelectStatement implements CQLStatement
 
                 // selected columns
                 for (ColumnDefinition def : selection.getColumns())
-                    if (def.kind == ColumnDefinition.Kind.REGULAR || def.kind == ColumnDefinition.Kind.STATIC)
+                    if (def.isRegular() || def.isStatic())
                         columns.add(cfm.comparator.create(prefix, def));
             }
             else
@@ -773,368 +527,45 @@ public class SelectStatement implements CQLStatement
         }
     }
 
-    /** Returns true if a non-frozen collection is selected, false otherwise. */
-    private boolean selectACollection()
-    {
-        if (!cfm.comparator.hasCollections())
-            return false;
-
-        for (ColumnDefinition def : selection.getColumns())
-        {
-            if (def.type.isCollection() && def.type.isMultiCell())
-                return true;
-        }
-
-        return false;
-    }
-
-    private static List<Composite> buildBound(Bound bound,
-                                              List<ColumnDefinition> defs,
-                                              Restriction[] restrictions,
-                                              boolean isReversed,
-                                              CType type,
-                                              QueryOptions options) throws InvalidRequestException
-    {
-        CBuilder builder = type.builder();
-
-        // check the first restriction to see if we're dealing with a multi-column restriction
-        if (!defs.isEmpty())
-        {
-            Restriction firstRestriction = restrictions[0];
-            if (firstRestriction != null && firstRestriction.isMultiColumn())
-            {
-                if (firstRestriction.isSlice())
-                    return buildMultiColumnSliceBound(bound, defs, (MultiColumnRestriction.Slice) firstRestriction, isReversed, builder, options);
-                else if (firstRestriction.isIN())
-                    return buildMultiColumnInBound(bound, defs, (MultiColumnRestriction.IN) firstRestriction, isReversed, builder, type, options);
-                else
-                    return buildMultiColumnEQBound(bound, defs, (MultiColumnRestriction.EQ) firstRestriction, isReversed, builder, options);
-            }
-        }
-
-        CompositesBuilder compositeBuilder = new CompositesBuilder(builder, isReversed ? type.reverseComparator() : type);
-        // The end-of-component of composite doesn't depend on whether the
-        // component type is reversed or not (i.e. the ReversedType is applied
-        // to the component comparator but not to the end-of-component itself),
-        // it only depends on whether the slice is reversed
-        Bound eocBound = isReversed ? Bound.reverse(bound) : bound;
-        for (Iterator<ColumnDefinition> iter = defs.iterator(); iter.hasNext();)
-        {
-            ColumnDefinition def = iter.next();
-
-            // In a restriction, we always have Bound.START < Bound.END for the "base" comparator.
-            // So if we're doing a reverse slice, we must inverse the bounds when giving them as start and end of the slice filter.
-            // But if the actual comparator itself is reversed, we must inversed the bounds too.
-            Bound b = isReversed == isReversedType(def) ? bound : Bound.reverse(bound);
-            Restriction r = restrictions[def.position()];
-            if (isNullRestriction(r, b) || !r.canEvaluateWithSlices())
-            {
-                // There wasn't any non EQ relation on that key, we select all records having the preceding component as prefix.
-                // For composites, if there was preceding component and we're computing the end, we must change the last component
-                // End-Of-Component, otherwise we would be selecting only one record.
-                EOC eoc = !compositeBuilder.isEmpty() && eocBound == Bound.END ? EOC.END : EOC.NONE;
-                return compositeBuilder.buildWithEOC(eoc);
-            }
-            if (r.isSlice())
-            {
-                compositeBuilder.addElementToAll(getSliceValue(r, b, options));
-                Operator relType = ((Restriction.Slice) r).getRelation(eocBound, b);
-                return compositeBuilder.buildWithEOC(eocForRelation(relType));
-            }
-
-            compositeBuilder.addEachElementToAll(r.values(options));
-
-            if (compositeBuilder.containsNull())
-                throw new InvalidRequestException(
-                        String.format("Invalid null clustering key part %s", def.name));
-        }
-        // Means no relation at all or everything was an equal
-        // Note: if the builder is "full", there is no need to use the end-of-component bit. For columns selection,
-        // it would be harmless to do it. However, we use this method got the partition key too. And when a query
-        // with 2ndary index is done, and with the the partition provided with an EQ, we'll end up here, and in that
-        // case using the eoc would be bad, since for the random partitioner we have no guarantee that
-        // prefix.end() will sort after prefix (see #5240).
-        EOC eoc = eocBound == Bound.END && compositeBuilder.hasRemaining() ? EOC.END : EOC.NONE;
-        return compositeBuilder.buildWithEOC(eoc);
-    }
-
-    private static Composite.EOC eocForRelation(Operator op)
-    {
-        switch (op)
-        {
-            case LT:
-                // < X => using startOf(X) as finish bound
-                return Composite.EOC.START;
-            case GT:
-            case LTE:
-                // > X => using endOf(X) as start bound
-                // <= X => using endOf(X) as finish bound
-                return Composite.EOC.END;
-            default:
-                // >= X => using X as start bound (could use START_OF too)
-                // = X => using X
-                return Composite.EOC.NONE;
-        }
-    }
-
-    private static List<Composite> buildMultiColumnSliceBound(Bound bound,
-                                                              List<ColumnDefinition> defs,
-                                                              MultiColumnRestriction.Slice slice,
-                                                              boolean isReversed,
-                                                              CBuilder builder,
-                                                              QueryOptions options) throws InvalidRequestException
-    {
-        Bound eocBound = isReversed ? Bound.reverse(bound) : bound;
-
-        Iterator<ColumnDefinition> iter = defs.iterator();
-        ColumnDefinition firstName = iter.next();
-        // A hack to preserve pre-6875 behavior for tuple-notation slices where the comparator mixes ASCENDING
-        // and DESCENDING orders.  This stores the bound for the first component; we will re-use it for all following
-        // components, even if they don't match the first component's reversal/non-reversal.  Note that this does *not*
-        // guarantee correct query results, it just preserves the previous behavior.
-        Bound firstComponentBound = isReversed == isReversedType(firstName) ? bound : Bound.reverse(bound);
-
-        if (!slice.hasBound(firstComponentBound))
-        {
-            Composite prefix = builder.build();
-            return Collections.singletonList(builder.remainingCount() > 0 && eocBound == Bound.END
-                    ? prefix.end()
-                    : prefix);
-        }
-
-        List<ByteBuffer> vals = slice.componentBounds(firstComponentBound, options);
-
-        ByteBuffer v = vals.get(firstName.position());
-        if (v == null)
-            throw new InvalidRequestException("Invalid null value in condition for column " + firstName.name);
-        builder.add(v);
-
-        while (iter.hasNext())
-        {
-            ColumnDefinition def = iter.next();
-            if (def.position() >= vals.size())
-                break;
-
-            v = vals.get(def.position());
-            if (v == null)
-                throw new InvalidRequestException("Invalid null value in condition for column " + def.name);
-            builder.add(v);
-        }
-        Operator relType = slice.getRelation(eocBound, firstComponentBound);
-        return Collections.singletonList(builder.build().withEOC(eocForRelation(relType)));
-    }
-
-    private static List<Composite> buildMultiColumnInBound(Bound bound,
-                                                           List<ColumnDefinition> defs,
-                                                           MultiColumnRestriction.IN restriction,
-                                                           boolean isReversed,
-                                                           CBuilder builder,
-                                                           CType type,
-                                                           QueryOptions options) throws InvalidRequestException
-    {
-        List<List<ByteBuffer>> splitInValues = restriction.splitValues(options);
-        Bound eocBound = isReversed ? Bound.reverse(bound) : bound;
-
-        // The IN query might not have listed the values in comparator order, so we need to re-sort
-        // the bounds lists to make sure the slices works correctly (also, to avoid duplicates).
-        TreeSet<Composite> inValues = new TreeSet<>(isReversed ? type.reverseComparator() : type);
-        for (List<ByteBuffer> components : splitInValues)
-        {
-            for (int i = 0; i < components.size(); i++)
-                if (components.get(i) == null)
-                    throw new InvalidRequestException("Invalid null value in condition for column " + defs.get(i));
-
-            Composite prefix = builder.buildWith(components);
-            inValues.add(eocBound == Bound.END && builder.remainingCount() - components.size() > 0
-                         ? prefix.end()
-                         : prefix);
-        }
-        return new ArrayList<>(inValues);
-    }
-
-    private static List<Composite> buildMultiColumnEQBound(Bound bound,
-                                                           List<ColumnDefinition> defs,
-                                                           MultiColumnRestriction.EQ restriction,
-                                                           boolean isReversed,
-                                                           CBuilder builder,
-                                                           QueryOptions options) throws InvalidRequestException
-    {
-        Bound eocBound = isReversed ? Bound.reverse(bound) : bound;
-        List<ByteBuffer> values = restriction.values(options);
-        for (int i = 0; i < values.size(); i++)
-        {
-            ByteBuffer component = values.get(i);
-            if (component == null)
-                throw new InvalidRequestException("Invalid null value in condition for column " + defs.get(i));
-            builder.add(component);
-        }
-
-        Composite prefix = builder.build();
-        return Collections.singletonList(builder.remainingCount() > 0 && eocBound == Bound.END
-                                         ? prefix.end()
-                                         : prefix);
-    }
-
-    private static boolean isNullRestriction(Restriction r, Bound b)
-    {
-        return r == null || (r.isSlice() && !((Restriction.Slice)r).hasBound(b));
-    }
-
-    private static ByteBuffer getSliceValue(Restriction r, Bound b, QueryOptions options) throws InvalidRequestException
-    {
-        Restriction.Slice slice = (Restriction.Slice)r;
-        assert slice.hasBound(b);
-        ByteBuffer val = slice.bound(b, options);
-        if (val == null)
-            throw new InvalidRequestException(String.format("Invalid null clustering key part %s", r));
-        return val;
-    }
-
-    private List<Composite> getRequestedBound(Bound b, QueryOptions options) throws InvalidRequestException
-    {
-        assert isColumnRange();
-        return buildBound(b, cfm.clusteringColumns(), columnRestrictions, isReversed, cfm.comparator, options);
-    }
-
     public List<IndexExpression> getValidatedIndexExpressions(QueryOptions options) throws InvalidRequestException
     {
-        if (!usesSecondaryIndexing || restrictedColumns.isEmpty())
+        if (!restrictions.usesSecondaryIndexing())
             return Collections.emptyList();
 
-        List<IndexExpression> expressions = new ArrayList<IndexExpression>();
-        for (ColumnDefinition def : restrictedColumns)
-        {
-            Restriction restriction;
-            switch (def.kind)
-            {
-                case PARTITION_KEY:
-                    restriction = keyRestrictions[def.position()];
-                    break;
-                case CLUSTERING_COLUMN:
-                    restriction = columnRestrictions[def.position()];
-                    break;
-                case REGULAR:
-                case STATIC:
-                    restriction = metadataRestrictions.get(def.name);
-                    break;
-                default:
-                    // We don't allow restricting a COMPACT_VALUE for now in prepare.
-                    throw new AssertionError();
-            }
-
-            if (restriction.isSlice())
-            {
-                Restriction.Slice slice = (Restriction.Slice)restriction;
-                for (Bound b : Bound.values())
-                {
-                    if (slice.hasBound(b))
-                    {
-                        ByteBuffer value = validateIndexedValue(def, slice.bound(b, options));
-                        Operator op = slice.getIndexOperator(b);
-                        // If the underlying comparator for name is reversed, we need to reverse the IndexOperator: user operation
-                        // always refer to the "forward" sorting even if the clustering order is reversed, but the 2ndary code does
-                        // use the underlying comparator as is.
-                        if (def.type instanceof ReversedType)
-                            op = reverse(op);
-                        expressions.add(new IndexExpression(def.name.bytes, op, value));
-                    }
-                }
-            }
-            else if (restriction.isContains())
-            {
-                SingleColumnRestriction.Contains contains = (SingleColumnRestriction.Contains)restriction;
-                for (ByteBuffer value : contains.values(options))
-                {
-                    validateIndexedValue(def, value);
-                    expressions.add(new IndexExpression(def.name.bytes, Operator.CONTAINS, value));
-                }
-                for (ByteBuffer key : contains.keys(options))
-                {
-                    validateIndexedValue(def, key);
-                    expressions.add(new IndexExpression(def.name.bytes, Operator.CONTAINS_KEY, key));
-                }
-            }
-            else
-            {
-                List<ByteBuffer> values = restriction.values(options);
-
-                if (values.size() != 1)
-                    throw new InvalidRequestException("IN restrictions are not supported on indexed columns");
+        List<IndexExpression> expressions = restrictions.getIndexExpressions(options);
 
-                ByteBuffer value = validateIndexedValue(def, values.get(0));
-                expressions.add(new IndexExpression(def.name.bytes, Operator.EQ, value));
-            }
-        }
+        ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(columnFamily());
+        SecondaryIndexManager secondaryIndexManager = cfs.indexManager;
+        secondaryIndexManager.validateIndexSearchersForQuery(expressions);
 
-        if (usesSecondaryIndexing)
-        {
-            ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(columnFamily());
-            SecondaryIndexManager secondaryIndexManager = cfs.indexManager;
-            secondaryIndexManager.validateIndexSearchersForQuery(expressions);
-        }
-        
         return expressions;
     }
 
-    private static ByteBuffer validateIndexedValue(ColumnDefinition def, ByteBuffer value) throws InvalidRequestException
-    {
-        if (value == null)
-            throw new InvalidRequestException(String.format("Unsupported null value for indexed column %s", def.name));
-        if (value.remaining() > 0xFFFF)
-            throw new InvalidRequestException("Index expression values may not be larger than 64K");
-        return value;
-    }
-
     private CellName makeExclusiveSliceBound(Bound bound, CellNameType type, QueryOptions options) throws InvalidRequestException
     {
-        if (sliceRestriction.isInclusive(bound))
+        if (restrictions.areRequestedBoundsInclusive(bound))
             return null;
 
-        if (sliceRestriction.isMultiColumn())
-            return type.makeCellName(((MultiColumnRestriction.Slice) sliceRestriction).componentBounds(bound, options).toArray());
-        else
-            return type.makeCellName(sliceRestriction.bound(bound, options));
+       return type.makeCellName(restrictions.getClusteringColumnsBounds(bound, options).get(0));
     }
 
     private Iterator<Cell> applySliceRestriction(final Iterator<Cell> cells, final QueryOptions options) throws InvalidRequestException
     {
-        assert sliceRestriction != null;
-
         final CellNameType type = cfm.comparator;
+
         final CellName excludedStart = makeExclusiveSliceBound(Bound.START, type, options);
         final CellName excludedEnd = makeExclusiveSliceBound(Bound.END, type, options);
 
-        return new AbstractIterator<Cell>()
+        return Iterators.filter(cells, new Predicate<Cell>()
         {
-            protected Cell computeNext()
+            public boolean apply(Cell c)
             {
-                while (cells.hasNext())
-                {
-                    Cell c = cells.next();
-
-                    // For dynamic CF, the column could be out of the requested bounds (because we don't support strict bounds internally (unless
-                    // the comparator is composite that is)), filter here
-                    if ( (excludedStart != null && type.compare(c.name(), excludedStart) == 0)
-                      || (excludedEnd != null && type.compare(c.name(), excludedEnd) == 0) )
-                        continue;
-
-                    return c;
-                }
-                return endOfData();
+                // For dynamic CF, the column could be out of the requested bounds (because we don't support strict bounds internally (unless
+                // the comparator is composite that is)), filter here
+                return !((excludedStart != null && type.compare(c.name(), excludedStart) == 0)
+                            || (excludedEnd != null && type.compare(c.name(), excludedEnd) == 0));
             }
-        };
-    }
-
-    private static Operator reverse(Operator op)
-    {
-        switch (op)
-        {
-            case LT:  return Operator.GT;
-            case LTE: return Operator.GTE;
-            case GT:  return Operator.LT;
-            case GTE: return Operator.LTE;
-            default: return op;
-        }
+        });
     }
 
     private ResultSet process(List<Row> rows, QueryOptions options, int limit, long now) throws InvalidRequestException
@@ -1178,7 +609,7 @@ public class SelectStatement implements CQLStatement
         }
 
         Iterator<Cell> cells = cf.getSortedColumns().iterator();
-        if (sliceRestriction != null)
+        if (restrictions.isNonCompositeSliceWithExclusiveBounds())
             cells = applySliceRestriction(cells, options);
 
         CQL3Row.RowIterator iter = cfm.comparator.CQL3RowBuilder(cfm, now).group(cells);
@@ -1187,7 +618,7 @@ public class SelectStatement implements CQLStatement
         // partition selection (i.e. not a 2ndary index search and there was no condition on clustering columns)
         // then we want to include the static columns in the result set (and we're done).
         CQL3Row staticRow = iter.getStaticRow();
-        if (staticRow != null && !iter.hasNext() && !usesSecondaryIndexing && hasNoClusteringColumnsRestriction())
+        if (staticRow != null && !iter.hasNext() && !restrictions.usesSecondaryIndexing() && restrictions.hasNoClusteringColumnsRestriction())
         {
             result.newRow(options.getProtocolVersion());
             for (ColumnDefinition def : selection.getColumns())
@@ -1259,18 +690,10 @@ public class SelectStatement implements CQLStatement
         result.add(row.getColumn(def.name));
     }
 
-    private boolean hasNoClusteringColumnsRestriction()
-    {
-        for (int i = 0; i < columnRestrictions.length; i++)
-            if (columnRestrictions[i] != null)
-                return false;
-        return true;
-    }
-
     private boolean needsPostQueryOrdering()
     {
         // We need post-query ordering only for queries with IN on the partition key and an ORDER BY.
-        return keyIsInRelation && !parameters.orderings.isEmpty();
+        return restrictions.keyIsInRelation() && !parameters.orderings.isEmpty();
     }
 
     /**
@@ -1281,83 +704,7 @@ public class SelectStatement implements CQLStatement
         if (cqlRows.size() == 0 || !needsPostQueryOrdering())
             return;
 
-        assert orderingIndexes != null;
-
-        List<Integer> idToSort = new ArrayList<Integer>();
-        List<Comparator<ByteBuffer>> sorters = new ArrayList<Comparator<ByteBuffer>>();
-
-        for (ColumnIdentifier.Raw identifier : parameters.orderings.keySet())
-        {
-            ColumnDefinition orderingColumn = cfm.getColumnDefinition(identifier.prepare(cfm));
-            idToSort.add(orderingIndexes.get(orderingColumn.name));
-            sorters.add(orderingColumn.type);
-        }
-
-        Comparator<List<ByteBuffer>> comparator = idToSort.size() == 1
-                                                ? new SingleColumnComparator(idToSort.get(0), sorters.get(0))
-                                                : new CompositeComparator(sorters, idToSort);
-        Collections.sort(cqlRows.rows, comparator);
-    }
-
-    private static boolean isReversedType(ColumnDefinition def)
-    {
-        return def.type instanceof ReversedType;
-    }
-
-    private boolean columnFilterIsIdentity()
-    {
-        for (Restriction r : columnRestrictions)
-        {
-            if (r != null)
-                return false;
-        }
-        return true;
-    }
-
-    private boolean hasClusteringColumnsRestriction()
-    {
-        for (int i = 0; i < columnRestrictions.length; i++)
-            if (columnRestrictions[i] != null)
-                return true;
-        return false;
-    }
-
-    private void validateDistinctSelection()
-    throws InvalidRequestException
-    {
-        Collection<ColumnDefinition> requestedColumns = selection.getColumns();
-        for (ColumnDefinition def : requestedColumns)
-            if (def.kind != ColumnDefinition.Kind.PARTITION_KEY && def.kind != ColumnDefinition.Kind.STATIC)
-                throw new InvalidRequestException(String.format("SELECT DISTINCT queries must only request partition key columns and/or static columns (not %s)", def.name));
-
-        // If it's a key range, we require that all partition key columns are selected so we don't have to bother with post-query grouping.
-        if (!isKeyRange)
-            return;
-
-        for (ColumnDefinition def : cfm.partitionKeyColumns())
-            if (!requestedColumns.contains(def))
-                throw new InvalidRequestException(String.format("SELECT DISTINCT queries must request all the partition key columns (missing %s)", def.name));
-    }
-
-    /**
-     * Checks if the specified column is restricted by multiple contains or contains key.
-     *
-     * @param columnDef the definition of the column to check
-     * @return <code>true</code> the specified column is restricted by multiple contains or contains key,
-     * <code>false</code> otherwise
-     */
-    private boolean isRestrictedByMultipleContains(ColumnDefinition columnDef)
-    {
-        if (!columnDef.type.isCollection())
-            return false;
-
-        Restriction restriction = metadataRestrictions.get(columnDef.name);
-
-        if (!(restriction instanceof Contains))
-            return false;
-
-        Contains contains = (Contains) restriction;
-        return (contains.numberOfValues() + contains.numberOfKeys()) > 1;
+        Collections.sort(cqlRows.rows, orderingComparator);
     }
 
     public static class RawStatement extends CFStatement
@@ -1385,136 +732,65 @@ public class SelectStatement implements CQLStatement
                                 ? Selection.wildcard(cfm)
                                 : Selection.fromSelectors(cfm, selectClause);
 
-            SelectStatement stmt = new SelectStatement(cfm, boundNames.size(), parameters, selection, prepareLimit(boundNames));
-
-            /*
-             * WHERE clause. For a given entity, rules are:
-             *   - EQ relation conflicts with anything else (including a 2nd EQ)
-             *   - Can't have more than one LT(E) relation (resp. GT(E) relation)
-             *   - IN relation are restricted to row keys (for now) and conflicts with anything else
-             *     (we could allow two IN for the same entity but that doesn't seem very useful)
-             *   - The value_alias cannot be restricted in any way (we don't support wide rows with indexed value in CQL so far)
-             */
-            boolean hasQueriableIndex = false;
-            boolean hasQueriableClusteringColumnIndex = false;
-            boolean hasSingleColumnRelations = false;
-            boolean hasMultiColumnRelations = false;
-
-            ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(columnFamily());
-            SecondaryIndexManager indexManager = cfs.indexManager;
-
-            for (Relation relation : whereClause)
-            {
-                if (relation.isMultiColumn())
-                {
-                    MultiColumnRelation rel = (MultiColumnRelation) relation;
-                    List<ColumnDefinition> names = new ArrayList<>(rel.getEntities().size());
-                    for (ColumnIdentifier.Raw rawEntity : rel.getEntities())
-                    {
-                        ColumnIdentifier entity = rawEntity.prepare(cfm);
-                        ColumnDefinition def = cfm.getColumnDefinition(entity);
-                        boolean[] queriable = processRelationEntity(stmt, indexManager, relation, entity, def);
-                        hasQueriableIndex |= queriable[0];
-                        hasQueriableClusteringColumnIndex |= queriable[1];
-                        names.add(def);
-                        hasMultiColumnRelations |= ColumnDefinition.Kind.CLUSTERING_COLUMN == def.kind;
-                    }
-                    updateRestrictionsForRelation(stmt, names, rel, boundNames);
-                }
-                else
-                {
-                    SingleColumnRelation rel = (SingleColumnRelation) relation;
-                    ColumnIdentifier entity = rel.getEntity().prepare(cfm);
-                    ColumnDefinition def = cfm.getColumnDefinition(entity);
-                    boolean[] queriable = processRelationEntity(stmt, indexManager, relation, entity, def);
-                    hasQueriableIndex |= queriable[0];
-                    hasQueriableClusteringColumnIndex |= queriable[1];
-                    hasSingleColumnRelations |= ColumnDefinition.Kind.CLUSTERING_COLUMN == def.kind;
-                    updateRestrictionsForRelation(stmt, def, rel, boundNames);
-                }
-            }
-            if (hasSingleColumnRelations && hasMultiColumnRelations)
-                throw new InvalidRequestException("Mixing single column relations and multi column relations on clustering columns is not allowed");
+            StatementRestrictions restrictions = prepareRestrictions(cfm, boundNames, selection);
 
-             // At this point, the select statement if fully constructed, but we still have a few things to validate
-            processPartitionKeyRestrictions(stmt, hasQueriableIndex, cfm);
-
-            // All (or none) of the partition key columns have been specified;
-            // hence there is no need to turn these restrictions into index expressions.
-            if (!stmt.usesSecondaryIndexing)
-                stmt.restrictedColumns.removeAll(cfm.partitionKeyColumns());
-
-            if (stmt.selectsOnlyStaticColumns && stmt.hasClusteringColumnsRestriction())
-                throw new InvalidRequestException("Cannot restrict clustering columns when selecting only static columns");
-
-            processColumnRestrictions(stmt, hasQueriableIndex, cfm);
+            if (parameters.isDistinct)
+                validateDistinctSelection(cfm, selection, restrictions);
 
-            // Covers indexes on the first clustering column (among others).
-            if (stmt.isKeyRange && hasQueriableClusteringColumnIndex)
-                stmt.usesSecondaryIndexing = true;
+            Comparator<List<ByteBuffer>> orderingComparator = null;
+            boolean isReversed = false;
 
-            if (!stmt.usesSecondaryIndexing)
+            if (!parameters.orderings.isEmpty())
             {
-                for (ColumnDefinition def : cfm.clusteringColumns())
-                {
-                    // Remove clustering column restrictions that can be handled by slices; the remainder will be
-                    // handled by filters (which may require a secondary index).
-                    Restriction restriction = stmt.columnRestrictions[def.position()];
-                    if (restriction != null)
-                    {
-                        if (restriction.canEvaluateWithSlices())
-                            stmt.restrictedColumns.remove(def);
-                        else
-                            stmt.usesSecondaryIndexing = true;
-                    }
-                }
+                verifyOrderingIsAllowed(restrictions);
+                orderingComparator = getOrderingComparator(cfm, selection, restrictions);
+                isReversed = isReversed(cfm);
             }
 
-            // Even if usesSecondaryIndexing is false at this point, we'll still have to use one if
-            // there are restrictions not covered by the PK.
-            if (!stmt.metadataRestrictions.isEmpty())
-                stmt.usesSecondaryIndexing = true;
-
-            if (stmt.usesSecondaryIndexing)
-                validateSecondaryIndexSelections(stmt);
-
-            if (!stmt.parameters.orderings.isEmpty())
-                processOrderingClause(stmt, cfm);
+            if (isReversed)
+                restrictions.reverse();
 
-            checkNeedsFiltering(stmt);
+            checkNeedsFiltering(restrictions);
 
-            if (parameters.isDistinct)
-                stmt.validateDistinctSelection();
+            SelectStatement stmt = new SelectStatement(cfm,
+                                                        boundNames.size(),
+                                                        parameters,
+                                                        selection,
+                                                        restrictions,
+                                                        isReversed,
+                                                        orderingComparator,
+                                                        prepareLimit(boundNames));
 
             return new ParsedStatement.Prepared(stmt, boundNames);
         }
 
-        /** Returns a pair of (hasQueriableIndex, hasQueriableClusteringColumnIndex) */
-        private boolean[] processRelationEntity(SelectStatement stmt,
-                                                SecondaryIndexManager indexManager,
-                                                Relation relation,
-                                                ColumnIdentifier entity,
-                                                ColumnDefinition def) throws InvalidRequestException
-        {
-            if (def == null)
-                handleUnrecognizedEntity(entity, relation);
-
-            stmt.restrictedColumns.add(def);
-
-            SecondaryIndex index = indexManager.getIndexForColumn(def.name.bytes);
-            if (index != null && index.supportsOperator(relation.operator()))
-                return new boolean[]{true, def.kind == ColumnDefinition.Kind.CLUSTERING_COLUMN};
-
-            return new boolean[]{false, false};
-        }
-
-        /** Throws an InvalidRequestException for an unrecognized identifier in the WHERE clause */
-        private void handleUnrecognizedEntity(ColumnIdentifier entity, Relation relation) throws InvalidRequestException
+        /**
+         * Prepares the restrictions.
+         *
+         * @param cfm the column family meta data
+         * @param boundNames the variable specifications
+         * @param selection the selection
+         * @return the restrictions
+         * @throws InvalidRequestException if a problem occurs while building the restrictions
+         */
+        private StatementRestrictions prepareRestrictions(CFMetaData cfm,
+                                                          VariableSpecifications boundNames,
+                                                          Selection selection) throws InvalidRequestException
         {
-            if (containsAlias(entity))
-                throw new InvalidRequestException(String.format("Aliases aren't allowed in the where clause ('%s')", relation));
-            else
-                throw new InvalidRequestException(String.format("Undefined name %s in where clause ('%s')", entity, relation));
+            try
+            {
+                return new StatementRestrictions(cfm,
+                                                 whereClause,
+                                                 boundNames,
+                                                 selection.containsOnlyStaticColumns(),
+                                                 selection.containsACollection());
+            }
+            catch (UnrecognizedEntityException e)
+            {
+                if (containsAlias(e.entity))
+                    throw invalidRequest("Aliases aren't allowed in the where clause ('%s')", e.relation);
+                throw e;
+            }
         }
 
         /** Returns a Term for the limit or null if no limit is set */
@@ -1528,478 +804,89 @@ public class SelectStatement implements CQLStatement
             return prepLimit;
         }
 
-        private void updateRestrictionsForRelation(SelectStatement stmt, List<ColumnDefinition> defs, MultiColumnRelation relation, VariableSpecifications boundNames) throws InvalidRequestException
+        private static void verifyOrderingIsAllowed(StatementRestrictions restrictions) throws InvalidRequestException
         {
-            List<ColumnDefinition> restrictedColumns = new ArrayList<>();
-            Set<ColumnDefinition> seen = new HashSet<>(defs.size());
-
-            int previousPosition = -1;
-            for (ColumnDefinition def : defs)
-            {
-                // ensure multi-column restriction only applies to clustering columns
-                if (def.kind != ColumnDefinition.Kind.CLUSTERING_COLUMN)
-                    throw new InvalidRequestException(String.format("Multi-column relations can only be applied to clustering columns: %s", def));
-
-                if (seen.contains(def))
-                    throw new InvalidRequestException(String.format("Column \"%s\" appeared twice in a relation: %s", def, relation));
-                seen.add(def);
-
-                // check that no clustering columns were skipped
-                if (def.position() != previousPosition + 1)
-                {
-                    if (previousPosition == -1)
-                        throw new InvalidRequestException(String.format(
-                                "Clustering columns may not be skipped in multi-column relations. " +
-                                "They should appear in the PRIMARY KEY order. Got %s", relation));
-                    else
-                        throw new InvalidRequestException(String.format(
-                                "Clustering columns must appear in the PRIMARY KEY order in multi-column relations: %s", relation));
-                }
-                previousPosition++;
-
-                Restriction existing = getExistingRestriction(stmt, def);
-                Operator operator = relation.operator();
-                if (existing != null)
-                {
-                    if (operator == Operator.EQ || operator == Operator.IN)
-                        throw new InvalidRequestException(String.format("Column \"%s\" cannot be restricted by more than one relation if it is in an %s relation", def, relation.operator()));
-                    else if (!existing.isSlice())
-                        throw new InvalidRequestException(String.format("Column \"%s\" cannot be restricted by an equality relation and an inequality relation", def));
-                }
-                restrictedColumns.add(def);
-            }
-
-            switch (relation.operator())
-            {
-                case EQ:
-                {
-                    Term t = relation.getValue().prepare(keyspace(), defs);
-                    t.collectMarkerSpecification(boundNames);
-                    Restriction restriction = new MultiColumnRestriction.EQ(t, false);
-                    for (ColumnDefinition def : restrictedColumns)
-                        stmt.columnRestrictions[def.position()] = restriction;
-                    break;
-                }
-                case IN:
-                {
-                    Restriction restriction;
-                    List<? extends Term.MultiColumnRaw> inValues = relation.getInValues();
-                    if (inValues != null)
-                    {
-                        // we have something like "(a, b, c) IN ((1, 2, 3), (4, 5, 6), ...) or
-                        // "(a, b, c) IN (?, ?, ?)
-                        List<Term> terms = new ArrayList<>(inValues.size());
-                        for (Term.MultiColumnRaw tuple : inValues)
-                        {
-                            Term t = tuple.prepare(keyspace(), defs);
-                            t.collectMarkerSpecification(boundNames);
-                            terms.add(t);
-                        }
-                         restriction = new MultiColumnRestriction.InWithValues(terms);
-                    }
-                    else
-                    {
-                        Tuples.INRaw rawMarker = relation.getInMarker();
-                        AbstractMarker t = rawMarker.prepare(keyspace(), defs);
-                        t.collectMarkerSpecification(boundNames);
-                        restriction = new MultiColumnRestriction.InWithMarker(t);
-                    }
-                    for (ColumnDefinition def : restrictedColumns)
-                        stmt.columnRestrictions[def.position()] = restriction;
-
-                    break;
-                }
-                case LT:
-                case LTE:
-                case GT:
-                case GTE:
-                {
-                    Term t = relation.getValue().prepare(keyspace(), defs);
-                    t.collectMarkerSpecification(boundNames);
-                    for (ColumnDefinition def : defs)
-                    {
-                        Restriction.Slice restriction = (Restriction.Slice)getExistingRestriction(stmt, def);
-                        if (restriction == null)
-                            restriction = new MultiColumnRestriction.Slice(false);
-                        else if (!restriction.isMultiColumn())
-                            throw new InvalidRequestException(String.format("Column \"%s\" cannot have both tuple-notation inequalities and single-column inequalities: %s", def.name, relation));
-                        restriction.setBound(def.name, relation.operator(), t);
-                        stmt.columnRestrictions[def.position()] = restriction;
-                    }
-                    break;
-                }
-                case NEQ:
-                    throw new InvalidRequestException(String.format("Unsupported \"!=\" relation: %s", relation));
-            }
+            checkFalse(restrictions.usesSecondaryIndexing(), "ORDER BY with 2ndary indexes is not supported.");
+            checkFalse(restrictions.isKeyRange(), "ORDER BY is only supported when the partition key is restricted by an EQ or an IN.");
         }
 
-        private Restriction getExistingRestriction(SelectStatement stmt, ColumnDefinition def)
+        private static void validateDistinctSelection(CFMetaData cfm,
+                                                      Selection selection,
+                                                      StatementRestrictions restrictions)
+                                                      throws InvalidRequestException
         {
-            switch (def.kind)
-            {
-                case PARTITION_KEY:
-                    return stmt.keyRestrictions[def.position()];
-                case CLUSTERING_COLUMN:
-                    return stmt.columnRestrictions[def.position()];
-                case REGULAR:
-                case STATIC:
-                    return stmt.metadataRestrictions.get(def.name);
-                default:
-                    throw new AssertionError();
-            }
-        }
+            Collection<ColumnDefinition> requestedColumns = selection.getColumns();
+            for (ColumnDefinition def : requestedColumns)
+                checkFalse(!def.isPartitionKey() && !def.isStatic(),
+                           "SELECT DISTINCT queries must only request partition key columns and/or static columns (not %s)",
+                           def.name);
 
-        private void updateRestrictionsForRelation(SelectStatement stmt, ColumnDefinition def, SingleColumnRelation relation, VariableSpecifications names) throws InvalidRequestException
-        {
-            switch (def.kind)
-            {
-                case PARTITION_KEY:
-                    stmt.keyRestrictions[def.position()] = updateSingleColumnRestriction(def, stmt.keyRestrictions[def.position()], relation, names);
-                    break;
-                case CLUSTERING_COLUMN:
-                    stmt.columnRestrictions[def.position()] = updateSingleColumnRestriction(def, stmt.columnRestrictions[def.position()], relation, names);
-                    break;
-                case COMPACT_VALUE:
-                    throw new InvalidRequestException(String.format("Predicates on the non-primary-key column (%s) of a COMPACT table are not yet supported", def.name));
-                case REGULAR:
-                case STATIC:
-                    // We only all IN on the row key and last clustering key so far, never on non-PK columns, and this even if there's an index
-                    Restriction r = updateSingleColumnRestriction(def, stmt.metadataRestrictions.get(def.name), relation, names);
-                    if (r.isIN() && !((Restriction.IN)r).canHaveOnlyOneValue())
-                        // Note: for backward compatibility reason, we conside a IN of 1 value the same as a EQ, so we let that slide.
-                        throw new InvalidRequestException(String.format("IN predicates on non-primary-key columns (%s) is not yet supported", def.name));
-                    stmt.metadataRestrictions.put(def.name, r);
-                    break;
-            }
-        }
-
-        Restriction updateSingleColumnRestriction(ColumnDefinition def, Restriction existingRestriction, SingleColumnRelation newRel, VariableSpecifications boundNames) throws InvalidRequestException
-        {
-            ColumnSpecification receiver = def;
-            if (newRel.onToken)
-            {
-                if (def.kind != ColumnDefinition.Kind.PARTITION_KEY)
-                    throw new InvalidRequestException(String.format("The token() function is only supported on the partition key, found on %s", def.name));
+            // If it's a key range, we require that all partition key columns are selected so we don't have to bother
+            // with post-query grouping.
+            if (!restrictions.isKeyRange())
+                return;
 
-                receiver = new ColumnSpecification(def.ksName,
-                                                   def.cfName,
-                                                   new ColumnIdentifier("partition key token", true),
-                                                   StorageService.getPartitioner().getTokenValidator());
-            }
-
-            // We don't support relations against entire collections (unless they're frozen), like "numbers = {1, 2, 3}"
-            if (receiver.type.isCollection() && receiver.type.isMultiCell() && !(newRel.operator() == Operator.CONTAINS_KEY || newRel.operator() == Operator.CONTAINS))
-            {
-                throw new InvalidRequestException(String.format("Collection column '%s' (%s) cannot be restricted by a '%s' relation",
-                                                                def.name, receiver.type.asCQL3Type(), newRel.operator()));
-            }
-
-            switch (newRel.operator())
-            {
-                case EQ:
-                {
-                    if (existingRestriction != null)
-                        throw new InvalidRequestException(String.format("%s cannot be restricted by more than one relation if it includes an Equal", def.name));
-                    Term t = newRel.getValue().prepare(keyspace(), receiver);
-                    t.collectMarkerSpecification(boundNames);
-                    existingRestriction = new SingleColumnRestriction.EQ(t, newRel.onToken);
-                }
-                break;
-                case IN:
-                    if (existingRestriction != null)
-                        throw new InvalidRequestException(String.format("%s cannot be restricted by more than one relation if it includes a IN", def.name));
-
-                    if (newRel.getInValues() == null)
-                    {
-                        // Means we have a "SELECT ... IN ?"
-                        assert newRel.getValue() != null;
-                        Term t = newRel.getValue().prepare(keyspace(), receiver);
-                        t.collectMarkerSpecification(boundNames);
-                        existingRestriction = new SingleColumnRestriction.InWithMarker((Lists.Marker)t);
-                    }
-                    else
-                    {
-                        List<Term> inValues = new ArrayList<>(newRel.getInValues().size());
-                        for (Term.Raw raw : newRel.getInValues())
-                        {
-                            Term t = raw.prepare(keyspace(), receiver);
-                            t.collectMarkerSpecification(boundNames);
-                            inValues.add(t);
-                        }
-                        existingRestriction = new SingleColumnRestriction.InWithValues(inValues);
-                    }
-                    break;
-                case NEQ:
-                    throw new InvalidRequestException(String.format("Unsupported \"!=\" relation on column \"%s\"", def.name));
-                case GT:
-                case GTE:
-                case LT:
-                case LTE:
-                    {
-                        if (existingRestriction == null)
-                            existingRestriction = new SingleColumnRestriction.Slice(newRel.onToken);
-                        else if (!existingRestriction.isSlice())
-                            throw new InvalidRequestException(String.format("Column \"%s\" cannot be restricted by both an equality and an inequality relation", def.name));
-                        else if (existingRestriction.isMultiColumn())
-                            throw new InvalidRequestException(String.format("Column \"%s\" cannot be restricted by both a tuple notation inequality and a single column inequality (%s)", def.name, newRel));
-                        else if (existingRestriction.isOnToken() != newRel.onToken)
-                            // For partition keys, we shouldn't have slice restrictions without token(). And while this is rejected later by
-                            // processPartitionKeysRestrictions, we shouldn't update the existing restriction by the new one if the old one was using token()
-                            // and the new one isn't since that would bypass that later test.
-                            throw new InvalidRequestException("Only EQ and IN relation are supported on the partition key (unless you use the token() function)");
-
-                        Term t = newRel.getValue().prepare(keyspace(), receiver);
-                        t.collectMarkerSpecification(boundNames);
-                        ((SingleColumnRestriction.Slice)existingRestriction).setBound(def.name, newRel.operator(), t);
-                    }
-                    break;
-                case CONTAINS_KEY:
-                    if (!(receiver.type instanceof MapType))
-                        throw new InvalidRequestException(String.format("Cannot use CONTAINS KEY on non-map column %s", def.name));
-                    // Fallthrough on purpose
-                case CONTAINS:
-                {
-                    if (!receiver.type.isCollection())
-                        throw new InvalidRequestException(String.format("Cannot use %s relation on non collection column %s", newRel.operator(), def.name));
-
-                    if (existingRestriction == null)
-                        existingRestriction = new SingleColumnRestriction.Contains();
-                    else if (!existingRestriction.isContains())
-                        throw new InvalidRequestException(String.format("Collection column %s can only be restricted by CONTAINS or CONTAINS KEY", def.name));
-
-                    boolean isKey = newRel.operator() == Operator.CONTAINS_KEY;
-                    receiver = makeCollectionReceiver(receiver, isKey);
-                    Term t = newRel.getValue().prepare(keyspace(), receiver);
-                    t.collectMarkerSpecification(boundNames);
-                    ((SingleColumnRestriction.Contains)existingRestriction).add(t, isKey);
-                    break;
-                }
-            }
-            return existingRestriction;
+            for (ColumnDefinition def : cfm.partitionKeyColumns())
+                checkTrue(requestedColumns.contains(def),
+                          "SELECT DISTINCT queries must request all the partition key columns (missing %s)", def.name);
         }
 
-        private void processPartitionKeyRestrictions(SelectStatement stmt, boolean hasQueriableIndex, CFMetaData cfm) throws InvalidRequestException
+        private void handleUnrecognizedOrderingColumn(ColumnIdentifier column) throws InvalidRequestException
         {
-            // If there is a queriable index, no special condition are required on the other restrictions.
-            // But we still need to know 2 things:
-            //   - If we don't have a queriable index, is the query ok
-            //   - Is it queriable without 2ndary index, which is always more efficient
-            // If a component of the partition key is restricted by a relation, all preceding
-            // components must have a EQ. Only the last partition key component can be in IN relation.
-            boolean canRestrictFurtherComponents = true;
-            ColumnDefinition previous = null;
-            stmt.keyIsInRelation = false;
-            Iterator<ColumnDefinition> iter = cfm.partitionKeyColumns().iterator();
-            for (int i = 0; i < stmt.keyRestrictions.length; i++)
-            {
-                ColumnDefinition cdef = iter.next();
-                Restriction restriction = stmt.keyRestrictions[i];
-
-                if (restriction == null)
-                {
-                    if (stmt.onToken)
-                        throw new InvalidRequestException("The token() function must be applied to all partition key components or none of them");
-
-                    // The only time not restricting a key part is allowed is if none are restricted or an index is used.
-                    if (i > 0 && stmt.keyRestrictions[i - 1] != null)
-                    {
-                        if (hasQueriableIndex)
-                        {
-                            stmt.usesSecondaryIndexing = true;
-                            stmt.isKeyRange = true;
-                            break;
-                        }
-                        throw new InvalidRequestException(String.format("Partition key part %s must be restricted since preceding part is", cdef.name));
-                    }
-
-                    stmt.isKeyRange = true;
-                    canRestrictFurtherComponents = false;
-                }
-                else if (!canRestrictFurtherComponents)
-                {
-                    if (hasQueriableIndex)
-                    {
-                        stmt.usesSecondaryIndexing = true;
-                        break;
-                    }
-                    throw new InvalidRequestException(String.format(
-                            "Partitioning column \"%s\" cannot be restricted because the preceding column (\"%s\") is " +
-                            "either not restricted or is restricted by a non-EQ relation", cdef.name, previous));
-                }
-                else if (restriction.isOnToken())
-                {
-                    // If this is a query on tokens, it's necessarily a range query (there can be more than one key per token).
-                    stmt.isKeyRange = true;
-                    stmt.onToken = true;
-                }
-                else if (stmt.onToken)
-                {
-                    throw new InvalidRequestException(String.format("The token() function must be applied to all partition key components or none of them"));
-                }
-                else if (!restriction.isSlice())
-                {
-                    if (restriction.isIN())
-                    {
-                        // We only support IN for the last name so far
-                        if (i != stmt.keyRestrictions.length - 1)
-                            throw new InvalidRequestException(String.format("Partition KEY part %s cannot be restricted by IN relation (only the last part of the partition key can)", cdef.name));
-                        stmt.keyIsInRelation = true;
-                    }
-                }
-                else
-                {
-                    // Non EQ relation is not supported without token(), even if we have a 2ndary index (since even those are ordered by partitioner).
-                    // Note: In theory we could allow it for 2ndary index queries with ALLOW FILTERING, but that would probably require some special casing
-                    // Note bis: This is also why we don't bother handling the 'tuple' notation of #4851 for keys. If we lift the limitation for 2ndary
-                    // index with filtering, we'll need to handle it though.
-                    throw new InvalidRequestException("Only EQ and IN relation are supported on the partition key (unless you use the token() function)");
-                }
-                previous = cdef;
-            }
-
-            if (stmt.onToken)
-                checkTokenFunctionArgumentsOrder(cfm);
+            checkFalse(containsAlias(column), "Aliases are not allowed in order by clause ('%s')", column);
+            checkFalse(true, "Order by on unknown column %s", column);
         }
 
-        /**
-         * Checks that the column identifiers used as argument for the token function have been specified in the
-         * partition key order.
-         * @param cfm the Column Family MetaData
-         * @throws InvalidRequestException if the arguments have not been provided in the proper order.
-         */
-        private void checkTokenFunctionArgumentsOrder(CFMetaData cfm) throws InvalidRequestException
+        private Comparator<List<ByteBuffer>> getOrderingComparator(CFMetaData cfm,
+                                                                   Selection selection,
+                                                                   StatementRestrictions restrictions)
+                                                                   throws InvalidRequestException
         {
-            Iterator<ColumnDefinition> iter = Iterators.cycle(cfm.partitionKeyColumns());
-            for (Relation relation : whereClause)
-            {
-                SingleColumnRelation singleColumnRelation = (SingleColumnRelation) relation;
-                if (singleColumnRelation.onToken && !cfm.getColumnDefinition(singleColumnRelation.getEntity().prepare(cfm)).equals(iter.next()))
-                    throw new InvalidRequestException(String.format("The token function arguments must be in the partition key order: %s",
-                                                                    Joiner.on(',').join(cfm.partitionKeyColumns())));
-            }
-        }
+            if (!restrictions.keyIsInRelation())
+                return null;
 
-        private void processColumnRestrictions(SelectStatement stmt, boolean hasQueriableIndex, CFMetaData cfm) throws InvalidRequestException
-        {
-            // If a clustering key column is restricted by a non-EQ relation, all preceding
-            // columns must have a EQ, and all following must have no restriction. Unless
-            // the column is indexed that is.
-            boolean canRestrictFurtherComponents = true;
-            ColumnDefinition previous = null;
-            boolean previousIsSlice = false;
-            Iterator<ColumnDefinition> iter = cfm.clusteringColumns().iterator();
-            for (int i = 0; i < stmt.columnRestrictions.length; i++)
-            {
-                ColumnDefinition cdef = iter.next();
-                Restriction restriction = stmt.columnRestrictions[i];
+            Map<ColumnIdentifier, Integer> orderingIndexes = getOrderingIndex(cfm, selection);
 
-                if (restriction == null)
-                {
-                    canRestrictFurtherComponents = false;
-                    previousIsSlice = false;
-                }
-                else if (!canRestrictFurtherComponents)
-                {
-                    // We're here if the previous clustering column was either not restricted or was a slice.
-                    // We can't restrict the current column unless:
-                    //   1) we're in the special case of the 'tuple' notation from #4851 which we expand as multiple
-                    //      consecutive slices: in which case we're good with this restriction and we continue
-                    //   2) we have a 2ndary index, in which case we have to use it but can skip more validation
-                    if (!(previousIsSlice && restriction.isSlice() && restriction.isMultiColumn()))
-                    {
-                        if (hasQueriableIndex)
-                        {
-                            stmt.usesSecondaryIndexing = true; // handle gaps and non-keyrange cases.
-                            break;
-                        }
-                        throw new InvalidRequestException(String.format(
-                                "PRIMARY KEY column \"%s\" cannot be restricted (preceding column \"%s\" is either not restricted or by a non-EQ relation)", cdef.name, previous));
-                    }
-                }
-                else if (restriction.isSlice())
-                {
-                    canRestrictFurtherComponents = false;
-                    previousIsSlice = true;
-                    Restriction.Slice slice = (Restriction.Slice)restriction;
-                    // For non-composite slices, we don't support internally the difference between exclusive and
-                    // inclusive bounds, so we deal with it manually.
-                    if (!cfm.comparator.isCompound() && (!slice.isInclusive(Bound.START) || !slice.isInclusive(Bound.END)))
-                        stmt.sliceRestriction = slice;
-                }
-                else if (restriction.isIN())
-                {
-                    if (stmt.selectACollection())
-                        throw new InvalidRequestException(String.format("Cannot restrict column \"%s\" by IN relation as a collection is selected by the query", cdef.name));
-                }
-                else if (restriction.isContains())
-                {
-                    if (!hasQueriableIndex)
-                        throw new InvalidRequestException(String.format("Cannot restrict column \"%s\" by a CONTAINS relation without a secondary index", cdef.name));
-                    stmt.usesSecondaryIndexing = true;
-                }
+            List<Integer> idToSort = new ArrayList<Integer>();
+            List<Comparator<ByteBuffer>> sorters = new ArrayList<Comparator<ByteBuffer>>();
 
-                previous = cdef;
+            for (ColumnIdentifier.Raw raw : parameters.orderings.keySet())
+            {
+                ColumnIdentifier identifier = raw.prepare(cfm);
+                ColumnDefinition orderingColumn = cfm.getColumnDefinition(identifier);
+                idToSort.add(orderingIndexes.get(orderingColumn.name));
+                sorters.add(orderingColumn.type);
             }
+            return idToSort.size() == 1 ? new SingleColumnComparator(idToSort.get(0), sorters.get(0))
+                    : new CompositeComparator(sorters, idToSort);
         }
 
-        private void validateSecondaryIndexSelections(SelectStatement stmt) throws InvalidRequestException
+        private Map<ColumnIdentifier, Integer> getOrderingIndex(CFMetaData cfm, Selection selection)
+                throws InvalidRequestException
         {
-            if (stmt.keyIsInRelation)
-                throw new InvalidRequestException("Select on indexed columns and with IN clause for the PRIMARY KEY are not supported");
-            // When the user only select static columns, the intent is that we don't query the whole partition but just
-            // the static parts. But 1) we don't have an easy way to do that with 2i and 2) since we don't support index on static columns
-            // so far, 2i means that you've

<TRUNCATED>