You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:48:10 UTC

[46/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index ff685cf..b4d7853 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.cql3.statements;
 
-import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.collect.Iterators;
@@ -27,7 +26,8 @@ import org.apache.cassandra.cql3.restrictions.Restriction;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.utils.Pair;
 
@@ -46,44 +46,57 @@ public class DeleteStatement extends ModificationStatement
         return false;
     }
 
-    public void addUpdateForKey(ColumnFamily cf, ByteBuffer key, Composite prefix, UpdateParameters params)
+    public void addUpdateForKey(PartitionUpdate update, CBuilder cbuilder, UpdateParameters params)
     throws InvalidRequestException
     {
-        List<Operation> deletions = getOperations();
+        List<Operation> regularDeletions = getRegularOperations();
+        List<Operation> staticDeletions = getStaticOperations();
 
-        if (prefix.size() < cfm.clusteringColumns().size() && !deletions.isEmpty())
+        if (regularDeletions.isEmpty() && staticDeletions.isEmpty())
         {
-            // In general, we can't delete specific columns if not all clustering columns have been specified.
-            // However, if we delete only static colums, it's fine since we won't really use the prefix anyway.
-            for (Operation deletion : deletions)
-                if (!deletion.column.isStatic())
-                    throw new InvalidRequestException(String.format("Primary key column '%s' must be specified in order to delete column '%s'", getFirstEmptyKey().name, deletion.column.name));
-        }
-
-        if (deletions.isEmpty())
-        {
-            // We delete the slice selected by the prefix.
-            // However, for performance reasons, we distinguish 2 cases:
-            //   - It's a full internal row delete
-            //   - It's a full cell name (i.e it's a dense layout and the prefix is full)
-            if (prefix.isEmpty())
+            // We're not deleting any specific columns so it's either a full partition deletion ....
+            if (cbuilder.count() == 0)
             {
-                // No columns specified, delete the row
-                cf.delete(new DeletionInfo(params.timestamp, params.localDeletionTime));
+                update.addPartitionDeletion(params.deletionTime());
             }
-            else if (cfm.comparator.isDense() && prefix.size() == cfm.clusteringColumns().size())
+            // ... or a row deletion ...
+            else if (cbuilder.remainingCount() == 0)
             {
-                cf.addAtom(params.makeTombstone(cfm.comparator.create(prefix, null)));
+                Clustering clustering = cbuilder.build();
+                Row.Writer writer = update.writer();
+                params.writeClustering(clustering, writer);
+                params.writeRowDeletion(writer);
+                writer.endOfRow();
             }
+            // ... or a range of rows deletion.
             else
             {
-                cf.addAtom(params.makeRangeTombstone(prefix.slice()));
+                update.addRangeTombstone(params.makeRangeTombstone(cbuilder));
             }
         }
         else
         {
-            for (Operation op : deletions)
-                op.execute(key, cf, prefix, params);
+            if (!regularDeletions.isEmpty())
+            {
+                // We can't delete specific (regular) columns if not all clustering columns have been specified.
+                if (cbuilder.remainingCount() > 0)
+                    throw new InvalidRequestException(String.format("Primary key column '%s' must be specified in order to delete column '%s'", getFirstEmptyKey().name, regularDeletions.get(0).column.name));
+
+                Clustering clustering = cbuilder.build();
+                Row.Writer writer = update.writer();
+                params.writeClustering(clustering, writer);
+                for (Operation op : regularDeletions)
+                    op.execute(update.partitionKey(), clustering, writer, params);
+                writer.endOfRow();
+            }
+
+            if (!staticDeletions.isEmpty())
+            {
+                Row.Writer writer = update.staticWriter();
+                for (Operation op : staticDeletions)
+                    op.execute(update.partitionKey(), Clustering.STATIC_CLUSTERING, writer, params);
+                writer.endOfRow();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
index ed10d00..8ad4f6c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
@@ -119,12 +119,6 @@ public class DropTypeStatement extends SchemaAlteringStatement
                 if (isUsedBy(subtype))
                     return true;
         }
-        else if (toCheck instanceof ColumnToCollectionType)
-        {
-            for (CollectionType collection : ((ColumnToCollectionType)toCheck).defined.values())
-                if (isUsedBy(collection))
-                    return true;
-        }
         else if (toCheck instanceof CollectionType)
         {
             if (toCheck instanceof ListType)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 888cdb7..6a6d186 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -21,7 +21,8 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.CFMetaData;
@@ -33,19 +34,20 @@ import org.apache.cassandra.cql3.restrictions.Restriction;
 import org.apache.cassandra.cql3.restrictions.SingleColumnRestriction;
 import org.apache.cassandra.cql3.selection.Selection;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.composites.CompositesBuilder;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.marshal.BooleanType;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.thrift.ThriftValidation;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.triggers.TriggerExecutor;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -58,6 +60,8 @@ import static org.apache.cassandra.cql3.statements.RequestValidations.invalidReq
  */
 public abstract class ModificationStatement implements CQLStatement
 {
+    protected static final Logger logger = LoggerFactory.getLogger(ModificationStatement.class);
+
     private static final ColumnIdentifier CAS_RESULT_COLUMN = new ColumnIdentifier("[applied]", false);
 
     public static enum StatementType { INSERT, UPDATE, DELETE }
@@ -68,7 +72,15 @@ public abstract class ModificationStatement implements CQLStatement
     public final Attributes attrs;
 
     protected final Map<ColumnIdentifier, Restriction> processedKeys = new HashMap<>();
-    private final List<Operation> columnOperations = new ArrayList<Operation>();
+    private final List<Operation> regularOperations = new ArrayList<>();
+    private final List<Operation> staticOperations = new ArrayList<>();
+
+    // TODO: If we had a builder for this statement, we could have updatedColumns/conditionColumns final and only have
+    // updatedColumnsBuilder/conditionColumnsBuilder in the builder ...
+    private PartitionColumns updatedColumns;
+    private PartitionColumns.Builder updatedColumnsBuilder = PartitionColumns.builder();
+    private PartitionColumns conditionColumns;
+    private PartitionColumns.Builder conditionColumnsBuilder = PartitionColumns.builder();
 
     // Separating normal and static conditions makes things somewhat easier
     private List<ColumnCondition> columnConditions;
@@ -103,25 +115,24 @@ public abstract class ModificationStatement implements CQLStatement
         Iterable<Function> functions = attrs.getFunctions();
 
         for (Restriction restriction : processedKeys.values())
-                functions = Iterables.concat(functions, restriction.getFunctions());
-
-        if (columnOperations != null)
-            for (Operation operation : columnOperations)
-                functions = Iterables.concat(functions, operation.getFunctions());
+            functions = Iterables.concat(functions, restriction.getFunctions());
 
-        if (columnConditions != null)
-            for (ColumnCondition condition : columnConditions)
-                functions = Iterables.concat(functions, condition.getFunctions());
+        for (Operation operation : allOperations())
+            functions = Iterables.concat(functions, operation.getFunctions());
 
-        if (staticConditions != null)
-            for (ColumnCondition condition : staticConditions)
-                functions = Iterables.concat(functions, condition.getFunctions());
+        for (ColumnCondition condition : allConditions())
+            functions = Iterables.concat(functions, condition.getFunctions());
 
         return functions;
     }
 
+    public boolean hasNoClusteringColumns()
+    {
+        return hasNoClusteringColumns;
+    }
+
     public abstract boolean requireFullClusteringKey();
-    public abstract void addUpdateForKey(ColumnFamily updates, ByteBuffer key, Composite prefix, UpdateParameters params) throws InvalidRequestException;
+    public abstract void addUpdateForKey(PartitionUpdate update, CBuilder clusteringBuilder, UpdateParameters params) throws InvalidRequestException;
 
     public int getBoundTerms()
     {
@@ -184,16 +195,75 @@ public abstract class ModificationStatement implements CQLStatement
 
     public void addOperation(Operation op)
     {
+        updatedColumnsBuilder.add(op.column);
+        // If the operation requires a read-before-write and we're doing a conditional read, we want to read
+        // the affected column as part of the read-for-conditions paxos phase (see #7499).
+        if (op.requiresRead())
+            conditionColumnsBuilder.add(op.column);
+
         if (op.column.isStatic())
+        {
             setsStaticColumns = true;
+            staticOperations.add(op);
+        }
         else
+        {
             setsRegularColumns = true;
-        columnOperations.add(op);
+            regularOperations.add(op);
+        }
+    }
+
+    public PartitionColumns updatedColumns()
+    {
+        return updatedColumns;
+    }
+
+    public PartitionColumns conditionColumns()
+    {
+        return conditionColumns;
     }
 
-    public List<Operation> getOperations()
+    public boolean updatesRegularRows()
     {
-        return columnOperations;
+        // We're updating regular rows if all the clustering columns are provided.
+        // Note that the only case where we're allowed not to provide clustering
+        // columns is if we set some static columns, and in that case no clustering
+        // columns should be given. So in practice, it's enough to check if we have
+        // either the table has no clustering or if it has at least one of them set.
+        return cfm.clusteringColumns().isEmpty() || !hasNoClusteringColumns;
+    }
+
+    public boolean updatesStaticRow()
+    {
+        return !staticOperations.isEmpty();
+    }
+
+    private void finishPreparation()
+    {
+        updatedColumns = updatedColumnsBuilder.build();
+        // Compact tables have not row marker. So if we don't actually update any particular column,
+        // this means that we're only updating the PK, which we allow if only those were declared in
+        // the definition. In that case however, we do went to write the compactValueColumn (since again
+        // we can't use a "row marker") so add it automatically.
+        if (cfm.isCompactTable() && updatedColumns.isEmpty() && updatesRegularRows())
+            updatedColumns = cfm.partitionColumns();
+
+        conditionColumns = conditionColumnsBuilder.build();
+    }
+
+    public List<Operation> getRegularOperations()
+    {
+        return regularOperations;
+    }
+
+    public List<Operation> getStaticOperations()
+    {
+        return staticOperations;
+    }
+
+    public Iterable<Operation> allOperations()
+    {
+        return Iterables.concat(staticOperations, regularOperations);
     }
 
     public Iterable<ColumnDefinition> getColumnsWithConditions()
@@ -205,8 +275,19 @@ public abstract class ModificationStatement implements CQLStatement
                                 staticConditions == null ? Collections.<ColumnDefinition>emptyList() : Iterables.transform(staticConditions, getColumnForCondition));
     }
 
+    public Iterable<ColumnCondition> allConditions()
+    {
+        if (staticConditions == null)
+            return columnConditions == null ? Collections.<ColumnCondition>emptySet(): columnConditions;
+        if (columnConditions == null)
+            return staticConditions;
+        return Iterables.concat(staticConditions, columnConditions);
+    }
+
     public void addCondition(ColumnCondition cond)
     {
+        conditionColumnsBuilder.add(cond.column);
+
         List<ColumnCondition> conds = null;
         if (cond.column.isStatic())
         {
@@ -255,7 +336,7 @@ public abstract class ModificationStatement implements CQLStatement
 
     public void addKeyValue(ColumnDefinition def, Term value) throws InvalidRequestException
     {
-        addKeyValues(def, new SingleColumnRestriction.EQ(def, value));
+        addKeyValues(def, new SingleColumnRestriction.EQRestriction(def, value));
     }
 
     public void processWhereClause(List<Relation> whereClause, VariableSpecifications names) throws InvalidRequestException
@@ -303,26 +384,25 @@ public abstract class ModificationStatement implements CQLStatement
     public List<ByteBuffer> buildPartitionKeyNames(QueryOptions options)
     throws InvalidRequestException
     {
-        CompositesBuilder keyBuilder = new CompositesBuilder(cfm.getKeyValidatorAsCType());
+        MultiCBuilder keyBuilder = MultiCBuilder.create(cfm.getKeyValidatorAsClusteringComparator());
         for (ColumnDefinition def : cfm.partitionKeyColumns())
         {
             Restriction r = checkNotNull(processedKeys.get(def.name), "Missing mandatory PRIMARY KEY part %s", def.name);
             r.appendTo(keyBuilder, options);
         }
 
-        return Lists.transform(keyBuilder.build(), new com.google.common.base.Function<Composite, ByteBuffer>()
+        NavigableSet<Clustering> clusterings = keyBuilder.build();
+        List<ByteBuffer> keys = new ArrayList<ByteBuffer>(clusterings.size());
+        for (Clustering clustering : clusterings)
         {
-            @Override
-            public ByteBuffer apply(Composite composite)
-            {
-                ByteBuffer byteBuffer = composite.toByteBuffer();
-                ThriftValidation.validateKey(cfm, byteBuffer);
-                return byteBuffer;
-            }
-        });
+            ByteBuffer key = CFMetaData.serializePartitionKey(clustering);
+            ThriftValidation.validateKey(cfm, key);
+            keys.add(key);
+        }
+        return keys;
     }
 
-    public Composite createClusteringPrefix(QueryOptions options)
+    public CBuilder createClustering(QueryOptions options)
     throws InvalidRequestException
     {
         // If the only updated/deleted columns are static, then we don't need clustering columns.
@@ -339,7 +419,7 @@ public abstract class ModificationStatement implements CQLStatement
         {
             // If we set no non-static columns, then it's fine not to have clustering columns
             if (hasNoClusteringColumns)
-                return cfm.comparator.staticPrefix();
+                return CBuilder.STATIC_BUILDER;
 
             // If we do have clustering columns however, then either it's an INSERT and the query is valid
             // but we still need to build a proper prefix, or it's not an INSERT, and then we want to reject
@@ -354,13 +434,15 @@ public abstract class ModificationStatement implements CQLStatement
             }
         }
 
-        return createClusteringPrefixBuilderInternal(options);
+        return createClusteringInternal(options);
     }
 
-    private Composite createClusteringPrefixBuilderInternal(QueryOptions options)
+    private CBuilder createClusteringInternal(QueryOptions options)
     throws InvalidRequestException
     {
-        CompositesBuilder builder = new CompositesBuilder(cfm.comparator);
+        CBuilder builder = CBuilder.create(cfm.comparator);
+        MultiCBuilder multiBuilder = MultiCBuilder.wrap(builder);
+
         ColumnDefinition firstEmptyKey = null;
         for (ColumnDefinition def : cfm.clusteringColumns())
         {
@@ -368,7 +450,7 @@ public abstract class ModificationStatement implements CQLStatement
             if (r == null)
             {
                 firstEmptyKey = def;
-                checkFalse(requireFullClusteringKey() && !cfm.comparator.isDense() && cfm.comparator.isCompound(), 
+                checkFalse(requireFullClusteringKey() && !cfm.isDense() && cfm.isCompound(),
                            "Missing mandatory PRIMARY KEY part %s", def.name);
             }
             else if (firstEmptyKey != null)
@@ -377,10 +459,10 @@ public abstract class ModificationStatement implements CQLStatement
             }
             else
             {
-                r.appendTo(builder, options);
+                r.appendTo(multiBuilder, options);
             }
         }
-        return builder.build().get(0); // We only allow IN for row keys so far
+        return builder;
     }
 
     protected ColumnDefinition getFirstEmptyKey()
@@ -396,14 +478,14 @@ public abstract class ModificationStatement implements CQLStatement
     public boolean requiresRead()
     {
         // Lists SET operation incurs a read.
-        for (Operation op : columnOperations)
+        for (Operation op : allOperations())
             if (op.requiresRead())
                 return true;
 
         return false;
     }
 
-    protected Map<ByteBuffer, CQL3Row> readRequiredRows(Collection<ByteBuffer> partitionKeys, Composite clusteringPrefix, boolean local, ConsistencyLevel cl)
+    protected Map<DecoratedKey, Partition> readRequiredLists(Collection<ByteBuffer> partitionKeys, CBuilder cbuilder, boolean local, ConsistencyLevel cl)
     throws RequestExecutionException, RequestValidationException
     {
         if (!requiresRead())
@@ -418,32 +500,54 @@ public abstract class ModificationStatement implements CQLStatement
             throw new InvalidRequestException(String.format("Write operation require a read but consistency %s is not supported on reads", cl));
         }
 
-        ColumnSlice[] slices = new ColumnSlice[]{ clusteringPrefix.slice() };
-        List<ReadCommand> commands = new ArrayList<ReadCommand>(partitionKeys.size());
-        long now = System.currentTimeMillis();
+        // TODO: no point in recomputing that every time. Should move to preparation phase.
+        PartitionColumns.Builder builder = PartitionColumns.builder();
+        for (Operation op : allOperations())
+            if (op.requiresRead())
+                builder.add(op.column);
+
+        PartitionColumns toRead = builder.build();
+
+        NavigableSet<Clustering> clusterings = FBUtilities.singleton(cbuilder.build(), cfm.comparator);
+        List<SinglePartitionReadCommand<?>> commands = new ArrayList<>(partitionKeys.size());
+        int nowInSec = FBUtilities.nowInSeconds();
         for (ByteBuffer key : partitionKeys)
-            commands.add(new SliceFromReadCommand(keyspace(),
-                                                  key,
-                                                  columnFamily(),
-                                                  now,
-                                                  new SliceQueryFilter(slices, false, Integer.MAX_VALUE)));
-
-        List<Row> rows = local
-                       ? SelectStatement.readLocally(keyspace(), commands)
-                       : StorageProxy.read(commands, cl);
-
-        Map<ByteBuffer, CQL3Row> map = new HashMap<ByteBuffer, CQL3Row>();
-        for (Row row : rows)
+            commands.add(new SinglePartitionNamesCommand(cfm,
+                                                         nowInSec,
+                                                         ColumnFilter.selection(toRead),
+                                                         RowFilter.NONE,
+                                                         DataLimits.NONE,
+                                                         StorageService.getPartitioner().decorateKey(key),
+                                                         new ClusteringIndexNamesFilter(clusterings, false)));
+
+        Map<DecoratedKey, Partition> map = new HashMap();
+
+        SinglePartitionReadCommand.Group group = new SinglePartitionReadCommand.Group(commands, DataLimits.NONE);
+
+        if (local)
+        {
+            try (ReadOrderGroup orderGroup = group.startOrderGroup(); PartitionIterator iter = group.executeInternal(orderGroup))
+            {
+                return asMaterializedMap(iter);
+            }
+        }
+        else
         {
-            if (row.cf == null || row.cf.isEmpty())
-                continue;
+            try (PartitionIterator iter = group.execute(cl, null))
+            {
+                return asMaterializedMap(iter);
+            }
+        }
+    }
 
-            Iterator<CQL3Row> iter = cfm.comparator.CQL3RowBuilder(cfm, now).group(row.cf.getSortedColumns().iterator());
-            if (iter.hasNext())
+    private Map<DecoratedKey, Partition> asMaterializedMap(PartitionIterator iterator)
+    {
+        Map<DecoratedKey, Partition> map = new HashMap();
+        while (iterator.hasNext())
+        {
+            try (RowIterator partition = iterator.next())
             {
-                map.put(row.key.getKey(), iter.next());
-                // We can only update one CQ3Row per partition key at a time (we don't allow IN for clustering key)
-                assert !iter.hasNext();
+                map.put(partition.partitionKey(), FilteredPartition.create(partition));
             }
         }
         return map;
@@ -492,14 +596,16 @@ public abstract class ModificationStatement implements CQLStatement
     {
         CQL3CasRequest request = makeCasRequest(queryState, options);
 
-        ColumnFamily result = StorageProxy.cas(keyspace(),
-                                               columnFamily(),
-                                               request.key,
-                                               request,
-                                               options.getSerialConsistency(),
-                                               options.getConsistency(),
-                                               queryState.getClientState());
-        return new ResultMessage.Rows(buildCasResultSet(request.key, result, options));
+        try (RowIterator result = StorageProxy.cas(keyspace(),
+                                                   columnFamily(),
+                                                   request.key,
+                                                   request,
+                                                   options.getSerialConsistency(),
+                                                   options.getConsistency(),
+                                                   queryState.getClientState()))
+        {
+            return new ResultMessage.Rows(buildCasResultSet(result, options));
+        }
     }
 
     private CQL3CasRequest makeCasRequest(QueryState queryState, QueryOptions options)
@@ -509,54 +615,54 @@ public abstract class ModificationStatement implements CQLStatement
         if (keys.size() > 1)
             throw new InvalidRequestException("IN on the partition key is not supported with conditional updates");
 
-        ByteBuffer key = keys.get(0);
+        DecoratedKey key = StorageService.getPartitioner().decorateKey(keys.get(0));
         long now = options.getTimestamp(queryState);
-        Composite prefix = createClusteringPrefix(options);
+        CBuilder cbuilder = createClustering(options);
 
-        CQL3CasRequest request = new CQL3CasRequest(cfm, key, false);
-        addConditions(prefix, request, options);
-        request.addRowUpdate(prefix, this, options, now);
+        CQL3CasRequest request = new CQL3CasRequest(cfm, key, false, conditionColumns(), updatesRegularRows(), updatesStaticRow());
+        addConditions(cbuilder.build(), request, options);
+        request.addRowUpdate(cbuilder, this, options, now);
         return request;
     }
 
-    public void addConditions(Composite clusteringPrefix, CQL3CasRequest request, QueryOptions options) throws InvalidRequestException
+    public void addConditions(Clustering clustering, CQL3CasRequest request, QueryOptions options) throws InvalidRequestException
     {
         if (ifNotExists)
         {
             // If we use ifNotExists, if the statement applies to any non static columns, then the condition is on the row of the non-static
-            // columns and the prefix should be the clusteringPrefix. But if only static columns are set, then the ifNotExists apply to the existence
+            // columns and the prefix should be the clustering. But if only static columns are set, then the ifNotExists apply to the existence
             // of any static columns and we should use the prefix for the "static part" of the partition.
-            request.addNotExist(clusteringPrefix);
+            request.addNotExist(clustering);
         }
         else if (ifExists)
         {
-            request.addExist(clusteringPrefix);
+            request.addExist(clustering);
         }
         else
         {
             if (columnConditions != null)
-                request.addConditions(clusteringPrefix, columnConditions, options);
+                request.addConditions(clustering, columnConditions, options);
             if (staticConditions != null)
-                request.addConditions(cfm.comparator.staticPrefix(), staticConditions, options);
+                request.addConditions(Clustering.STATIC_CLUSTERING, staticConditions, options);
         }
     }
 
-    private ResultSet buildCasResultSet(ByteBuffer key, ColumnFamily cf, QueryOptions options) throws InvalidRequestException
+    private ResultSet buildCasResultSet(RowIterator partition, QueryOptions options) throws InvalidRequestException
     {
-        return buildCasResultSet(keyspace(), key, columnFamily(), cf, getColumnsWithConditions(), false, options);
+        return buildCasResultSet(keyspace(), columnFamily(), partition, getColumnsWithConditions(), false, options);
     }
 
-    public static ResultSet buildCasResultSet(String ksName, ByteBuffer key, String cfName, ColumnFamily cf, Iterable<ColumnDefinition> columnsWithConditions, boolean isBatch, QueryOptions options)
+    public static ResultSet buildCasResultSet(String ksName, String tableName, RowIterator partition, Iterable<ColumnDefinition> columnsWithConditions, boolean isBatch, QueryOptions options)
     throws InvalidRequestException
     {
-        boolean success = cf == null;
+        boolean success = partition == null;
 
-        ColumnSpecification spec = new ColumnSpecification(ksName, cfName, CAS_RESULT_COLUMN, BooleanType.instance);
+        ColumnSpecification spec = new ColumnSpecification(ksName, tableName, CAS_RESULT_COLUMN, BooleanType.instance);
         ResultSet.ResultMetadata metadata = new ResultSet.ResultMetadata(Collections.singletonList(spec));
         List<List<ByteBuffer>> rows = Collections.singletonList(Collections.singletonList(BooleanType.instance.decompose(success)));
 
         ResultSet rs = new ResultSet(metadata, rows);
-        return success ? rs : merge(rs, buildCasFailureResultSet(key, cf, columnsWithConditions, isBatch, options));
+        return success ? rs : merge(rs, buildCasFailureResultSet(partition, columnsWithConditions, isBatch, options));
     }
 
     private static ResultSet merge(ResultSet left, ResultSet right)
@@ -582,10 +688,10 @@ public abstract class ModificationStatement implements CQLStatement
         return new ResultSet(new ResultSet.ResultMetadata(specs), rows);
     }
 
-    private static ResultSet buildCasFailureResultSet(ByteBuffer key, ColumnFamily cf, Iterable<ColumnDefinition> columnsWithConditions, boolean isBatch, QueryOptions options)
+    private static ResultSet buildCasFailureResultSet(RowIterator partition, Iterable<ColumnDefinition> columnsWithConditions, boolean isBatch, QueryOptions options)
     throws InvalidRequestException
     {
-        CFMetaData cfm = cf.metadata();
+        CFMetaData cfm = partition.metadata();
         Selection selection;
         if (columnsWithConditions == null)
         {
@@ -609,9 +715,8 @@ public abstract class ModificationStatement implements CQLStatement
 
         }
 
-        long now = System.currentTimeMillis();
-        Selection.ResultSetBuilder builder = selection.resultSetBuilder(now, false);
-        SelectStatement.forSelection(cfm, selection).processColumnFamily(key, cf, options, now, builder);
+        Selection.ResultSetBuilder builder = selection.resultSetBuilder(false);
+        SelectStatement.forSelection(cfm, selection).processPartition(partition, options, builder, FBUtilities.nowInSeconds());
 
         return builder.build(options.getProtocolVersion());
     }
@@ -640,31 +745,31 @@ public abstract class ModificationStatement implements CQLStatement
     public ResultMessage executeInternalWithCondition(QueryState state, QueryOptions options) throws RequestValidationException, RequestExecutionException
     {
         CQL3CasRequest request = makeCasRequest(state, options);
-        ColumnFamily result = casInternal(request, state);
-        return new ResultMessage.Rows(buildCasResultSet(request.key, result, options));
+        try (RowIterator result = casInternal(request, state))
+        {
+            return new ResultMessage.Rows(buildCasResultSet(result, options));
+        }
     }
 
-    static ColumnFamily casInternal(CQL3CasRequest request, QueryState state)
+    static RowIterator casInternal(CQL3CasRequest request, QueryState state)
     {
         UUID ballot = UUIDGen.getTimeUUIDFromMicros(state.getTimestamp());
         CFMetaData metadata = Schema.instance.getCFMetaData(request.cfm.ksName, request.cfm.cfName);
 
-        ReadCommand readCommand = ReadCommand.create(request.cfm.ksName, request.key, request.cfm.cfName, request.now, request.readFilter());
-        Keyspace keyspace = Keyspace.open(request.cfm.ksName);
-
-        Row row = readCommand.getRow(keyspace);
-        ColumnFamily current = row.cf;
-        if (!request.appliesTo(current))
+        SinglePartitionReadCommand readCommand = request.readCommand(FBUtilities.nowInSeconds());
+        FilteredPartition current;
+        try (ReadOrderGroup orderGroup = readCommand.startOrderGroup(); PartitionIterator iter = readCommand.executeInternal(orderGroup))
         {
-            if (current == null)
-                current = ArrayBackedSortedColumns.factory.create(metadata);
-            return current;
+            current = FilteredPartition.create(PartitionIterators.getOnlyElement(iter, readCommand));
         }
 
-        ColumnFamily updates = request.makeUpdates(current);
-        updates = TriggerExecutor.instance.execute(request.key, updates);
+        if (!request.appliesTo(current))
+            return current.rowIterator();
+
+        PartitionUpdate updates = request.makeUpdates(current);
+        updates = TriggerExecutor.instance.execute(updates);
 
-        Commit proposal = Commit.newProposal(request.key, ballot, updates);
+        Commit proposal = Commit.newProposal(ballot, updates);
         proposal.makeMutation().apply();
         return null;
     }
@@ -683,17 +788,18 @@ public abstract class ModificationStatement implements CQLStatement
     throws RequestExecutionException, RequestValidationException
     {
         List<ByteBuffer> keys = buildPartitionKeyNames(options);
-        Composite clusteringPrefix = createClusteringPrefix(options);
+        CBuilder clustering = createClustering(options);
 
-        UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, options, local, now);
+        UpdateParameters params = makeUpdateParameters(keys, clustering, options, local, now);
 
         Collection<IMutation> mutations = new ArrayList<IMutation>(keys.size());
         for (ByteBuffer key: keys)
         {
             ThriftValidation.validateKey(cfm, key);
-            ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm);
-            addUpdateForKey(cf, key, clusteringPrefix, params);
-            Mutation mut = new Mutation(cfm.ksName, key, cf);
+            DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
+            PartitionUpdate upd = new PartitionUpdate(cfm, dk, updatedColumns(), 1);
+            addUpdateForKey(upd, clustering, params);
+            Mutation mut = new Mutation(upd);
 
             mutations.add(isCounter() ? new CounterMutation(mut, options.getConsistency()) : mut);
         }
@@ -701,15 +807,15 @@ public abstract class ModificationStatement implements CQLStatement
     }
 
     public UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
-                                                 Composite prefix,
+                                                 CBuilder clustering,
                                                  QueryOptions options,
                                                  boolean local,
                                                  long now)
     throws RequestExecutionException, RequestValidationException
     {
         // Some lists operation requires reading
-        Map<ByteBuffer, CQL3Row> rows = readRequiredRows(keys, prefix, local, options.getConsistency());
-        return new UpdateParameters(cfm, options, getTimestamp(now, options), getTimeToLive(options), rows);
+        Map<DecoratedKey, Partition> lists = readRequiredLists(keys, clustering, local, options.getConsistency());
+        return new UpdateParameters(cfm, options, getTimestamp(now, options), getTimeToLive(options), lists, true);
     }
 
     /**
@@ -803,6 +909,8 @@ public abstract class ModificationStatement implements CQLStatement
 
                 stmt.validateWhereClauseForConditions();
             }
+
+            stmt.finishPreparation();
             return stmt;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index e2708cd..6a7f429 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -23,7 +23,8 @@ import java.util.*;
 import com.google.common.base.Objects;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.CFMetaData;
@@ -34,7 +35,8 @@ import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
 import org.apache.cassandra.cql3.selection.RawSelector;
 import org.apache.cassandra.cql3.selection.Selection;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.CollectionType;
@@ -43,12 +45,9 @@ import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.QueryState;
-import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.pager.Pageable;
+import org.apache.cassandra.service.*;
 import org.apache.cassandra.service.pager.QueryPager;
-import org.apache.cassandra.service.pager.QueryPagers;
+import org.apache.cassandra.service.pager.PagingState;
 import org.apache.cassandra.thrift.ThriftValidation;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -71,6 +70,8 @@ import static org.apache.cassandra.utils.ByteBufferUtil.UNSET_BYTE_BUFFER;
  */
 public class SelectStatement implements CQLStatement
 {
+    private static final Logger logger = LoggerFactory.getLogger(SelectStatement.class);
+
     private static final int DEFAULT_COUNT_PAGE_SIZE = 10000;
 
     private final int boundTerms;
@@ -88,6 +89,8 @@ public class SelectStatement implements CQLStatement
      */
     private final Comparator<List<ByteBuffer>> orderingComparator;
 
+    private final ColumnFilter queriedColumns;
+
     // Used by forSelection below
     private static final Parameters defaultParameters = new Parameters(Collections.<ColumnIdentifier.Raw, Boolean>emptyMap(), false, false, false);
 
@@ -108,6 +111,7 @@ public class SelectStatement implements CQLStatement
         this.orderingComparator = orderingComparator;
         this.parameters = parameters;
         this.limit = limit;
+        this.queriedColumns = gatherQueriedColumns();
     }
 
     public Iterable<Function> getFunctions()
@@ -117,6 +121,23 @@ public class SelectStatement implements CQLStatement
                                 limit != null ? limit.getFunctions() : Collections.<Function>emptySet());
     }
 
+    // Note that the queried columns internally is different from the one selected by the
+    // user as it also include any column for which we have a restriction on.
+    private ColumnFilter gatherQueriedColumns()
+    {
+        if (selection.isWildcard())
+            return ColumnFilter.all(cfm);
+
+        ColumnFilter.Builder builder = ColumnFilter.allColumnsBuilder(cfm);
+        // Adds all selected columns
+        for (ColumnDefinition def : selection.getColumns())
+            if (!def.isPrimaryKeyColumn())
+                builder.add(def);
+        // as well as any restricted column (so we can actually apply the restriction)
+        builder.addAll(restrictions.nonPKRestrictedColumns());
+        return builder.build();
+    }
+
     // Creates a simple select based on the given selection.
     // Note that the results select statement should not be used for actual queries, but only for processing already
     // queried data through processColumnFamily.
@@ -161,31 +182,16 @@ public class SelectStatement implements CQLStatement
 
         cl.validateForRead(keyspace());
 
-        int limit = getLimit(options);
-        long now = System.currentTimeMillis();
-        Pageable command = getPageableCommand(options, limit, now);
-        int pageSize = getPageSize(options);
+        int nowInSec = FBUtilities.nowInSeconds();
+        ReadQuery query = getQuery(options, nowInSec);
 
-        if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize))
-            return execute(command, options, limit, now, state);
-
-        QueryPager pager = QueryPagers.pager(command, cl, state.getClientState(), options.getPagingState());
-        return execute(pager, options, limit, now, pageSize);
-    }
+        int pageSize = getPageSize(options);
 
-    private Pageable getPageableCommand(QueryOptions options, int limit, long now) throws RequestValidationException
-    {
-        int limitForQuery = updateLimitForQuery(limit);
-        if (restrictions.isKeyRange() || restrictions.usesSecondaryIndexing())
-            return getRangeCommand(options, limitForQuery, now);
+        if (pageSize <= 0 || query.limits().count() <= pageSize)
+            return execute(query, options, state, nowInSec);
 
-        List<ReadCommand> commands = getSliceCommands(options, limitForQuery, now);
-        return commands == null ? null : new Pageable.ReadCommands(commands, limitForQuery);
-    }
-
-    public Pageable getPageableCommand(QueryOptions options) throws RequestValidationException
-    {
-        return getPageableCommand(options, getLimit(options), System.currentTimeMillis());
+        QueryPager pager = query.getPager(options.getPagingState());
+        return execute(Pager.forDistributedQuery(pager, cl, state.getClientState()), options, pageSize, nowInSec);
     }
 
     private int getPageSize(QueryOptions options)
@@ -201,103 +207,162 @@ public class SelectStatement implements CQLStatement
         return  pageSize;
     }
 
-    private ResultMessage.Rows execute(Pageable command, QueryOptions options, int limit, long now, QueryState state)
-    throws RequestValidationException, RequestExecutionException
+    public ReadQuery getQuery(QueryOptions options, int nowInSec) throws RequestValidationException
+    {
+        DataLimits limit = getLimit(options);
+        if (restrictions.isKeyRange() || restrictions.usesSecondaryIndexing())
+            return getRangeCommand(options, limit, nowInSec);
+
+        return getSliceCommands(options, limit, nowInSec);
+    }
+
+    private ResultMessage.Rows execute(ReadQuery query, QueryOptions options, QueryState state, int nowInSec) throws RequestValidationException, RequestExecutionException
     {
-        List<Row> rows;
-        if (command == null)
+        try (PartitionIterator data = query.execute(options.getConsistency(), state.getClientState()))
         {
-            rows = Collections.<Row>emptyList();
+            return processResults(data, options, nowInSec);
         }
-        else
+    }
+
+    // Simple wrapper class to avoid some code duplication
+    private static abstract class Pager
+    {
+        protected QueryPager pager;
+
+        protected Pager(QueryPager pager)
+        {
+            this.pager = pager;
+        }
+
+        public static Pager forInternalQuery(QueryPager pager, ReadOrderGroup orderGroup)
+        {
+            return new InternalPager(pager, orderGroup);
+        }
+
+        public static Pager forDistributedQuery(QueryPager pager, ConsistencyLevel consistency, ClientState clientState)
+        {
+            return new NormalPager(pager, consistency, clientState);
+        }
+
+        public boolean isExhausted()
+        {
+            return pager.isExhausted();
+        }
+
+        public PagingState state()
+        {
+            return pager.state();
+        }
+
+        public abstract PartitionIterator fetchPage(int pageSize);
+
+        public static class NormalPager extends Pager
         {
-            rows = command instanceof Pageable.ReadCommands
-                 ? StorageProxy.read(((Pageable.ReadCommands)command).commands, options.getConsistency(), state.getClientState())
-                 : StorageProxy.getRangeSlice((RangeSliceCommand)command, options.getConsistency());
+            private final ConsistencyLevel consistency;
+            private final ClientState clientState;
+
+            private NormalPager(QueryPager pager, ConsistencyLevel consistency, ClientState clientState)
+            {
+                super(pager);
+                this.consistency = consistency;
+                this.clientState = clientState;
+            }
+
+            public PartitionIterator fetchPage(int pageSize)
+            {
+                return pager.fetchPage(pageSize, consistency, clientState);
+            }
         }
 
-        return processResults(rows, options, limit, now);
+        public static class InternalPager extends Pager
+        {
+            private final ReadOrderGroup orderGroup;
+
+            private InternalPager(QueryPager pager, ReadOrderGroup orderGroup)
+            {
+                super(pager);
+                this.orderGroup = orderGroup;
+            }
+
+            public PartitionIterator fetchPage(int pageSize)
+            {
+                return pager.fetchPageInternal(pageSize, orderGroup);
+            }
+        }
     }
 
-    private ResultMessage.Rows execute(QueryPager pager, QueryOptions options, int limit, long now, int pageSize)
+    private ResultMessage.Rows execute(Pager pager, QueryOptions options, int pageSize, int nowInSec)
     throws RequestValidationException, RequestExecutionException
     {
         if (selection.isAggregate())
-            return pageAggregateQuery(pager, options, pageSize, now);
+            return pageAggregateQuery(pager, options, pageSize, nowInSec);
 
         // We can't properly do post-query ordering if we page (see #6722)
         checkFalse(needsPostQueryOrdering(),
-                   "Cannot page queries with both ORDER BY and a IN restriction on the partition key;"
-                   + " you must either remove the ORDER BY or the IN and sort client side, or disable paging for this query");
+                  "Cannot page queries with both ORDER BY and a IN restriction on the partition key;"
+                  + " you must either remove the ORDER BY or the IN and sort client side, or disable paging for this query");
 
-        List<Row> page = pager.fetchPage(pageSize);
-        ResultMessage.Rows msg = processResults(page, options, limit, now);
+        ResultMessage.Rows msg;
+        try (PartitionIterator page = pager.fetchPage(pageSize))
+        {
+            msg = processResults(page, options, nowInSec);
+        }
 
+        // Please note that the isExhausted state of the pager only gets updated when we've closed the page, so this
+        // shouldn't be moved inside the 'try' above.
         if (!pager.isExhausted())
             msg.result.metadata.setHasMorePages(pager.state());
 
         return msg;
     }
 
-    private ResultMessage.Rows pageAggregateQuery(QueryPager pager, QueryOptions options, int pageSize, long now)
-            throws RequestValidationException, RequestExecutionException
+    private ResultMessage.Rows pageAggregateQuery(Pager pager, QueryOptions options, int pageSize, int nowInSec)
+    throws RequestValidationException, RequestExecutionException
     {
-        Selection.ResultSetBuilder result = selection.resultSetBuilder(now, parameters.isJson);
+        Selection.ResultSetBuilder result = selection.resultSetBuilder(parameters.isJson);
         while (!pager.isExhausted())
         {
-            for (org.apache.cassandra.db.Row row : pager.fetchPage(pageSize))
+            try (PartitionIterator iter = pager.fetchPage(pageSize))
             {
-                // Not columns match the query, skip
-                if (row.cf == null)
-                    continue;
-
-                processColumnFamily(row.key.getKey(), row.cf, options, now, result);
+                while (iter.hasNext())
+                    processPartition(iter.next(), options, result, nowInSec);
             }
         }
         return new ResultMessage.Rows(result.build(options.getProtocolVersion()));
     }
 
-    public ResultMessage.Rows processResults(List<Row> rows, QueryOptions options, int limit, long now) throws RequestValidationException
+    private ResultMessage.Rows processResults(PartitionIterator partitions, QueryOptions options, int nowInSec) throws RequestValidationException
     {
-        ResultSet rset = process(rows, options, limit, now);
+        ResultSet rset = process(partitions, options, nowInSec);
         return new ResultMessage.Rows(rset);
     }
 
-    static List<Row> readLocally(String keyspaceName, List<ReadCommand> cmds)
-    {
-        Keyspace keyspace = Keyspace.open(keyspaceName);
-        List<Row> rows = new ArrayList<Row>(cmds.size());
-        for (ReadCommand cmd : cmds)
-            rows.add(cmd.getRow(keyspace));
-        return rows;
-    }
-
     public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
     {
-        int limit = getLimit(options);
-        long now = System.currentTimeMillis();
-        Pageable command = getPageableCommand(options, limit, now);
+        int nowInSec = FBUtilities.nowInSeconds();
+        ReadQuery query = getQuery(options, nowInSec);
         int pageSize = getPageSize(options);
 
-        if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize))
+        try (ReadOrderGroup orderGroup = query.startOrderGroup())
         {
-            List<Row> rows = command == null
-                             ? Collections.<Row>emptyList()
-                             : (command instanceof Pageable.ReadCommands
-                                ? readLocally(keyspace(), ((Pageable.ReadCommands)command).commands)
-                                : ((RangeSliceCommand)command).executeLocally());
-
-            return processResults(rows, options, limit, now);
+            if (pageSize <= 0 || query.limits().count() <= pageSize)
+            {
+                try (PartitionIterator data = query.executeInternal(orderGroup))
+                {
+                    return processResults(data, options, nowInSec);
+                }
+            }
+            else
+            {
+                QueryPager pager = query.getPager(options.getPagingState());
+                return execute(Pager.forInternalQuery(pager, orderGroup), options, pageSize, nowInSec);
+            }
         }
-
-        QueryPager pager = QueryPagers.localPager(command);
-        return execute(pager, options, limit, now, pageSize);
     }
 
-    public ResultSet process(List<Row> rows) throws InvalidRequestException
+    public ResultSet process(PartitionIterator partitions, int nowInSec) throws InvalidRequestException
     {
-        QueryOptions options = QueryOptions.DEFAULT;
-        return process(rows, options, getLimit(options), System.currentTimeMillis());
+        return process(partitions, QueryOptions.DEFAULT, nowInSec);
     }
 
     public String keyspace()
@@ -326,372 +391,239 @@ public class SelectStatement implements CQLStatement
         return restrictions;
     }
 
-    private List<ReadCommand> getSliceCommands(QueryOptions options, int limit, long now) throws RequestValidationException
+    private ReadQuery getSliceCommands(QueryOptions options, DataLimits limit, int nowInSec) throws RequestValidationException
     {
         Collection<ByteBuffer> keys = restrictions.getPartitionKeys(options);
+        if (keys.isEmpty())
+            return ReadQuery.EMPTY;
 
-        List<ReadCommand> commands = new ArrayList<>(keys.size());
-
-        IDiskAtomFilter filter = makeFilter(options, limit);
+        ClusteringIndexFilter filter = makeClusteringIndexFilter(options);
         if (filter == null)
-            return null;
+            return ReadQuery.EMPTY;
+
+        RowFilter rowFilter = getRowFilter(options);
 
         // Note that we use the total limit for every key, which is potentially inefficient.
         // However, IN + LIMIT is not a very sensible choice.
+        List<SinglePartitionReadCommand<?>> commands = new ArrayList<>(keys.size());
         for (ByteBuffer key : keys)
         {
             QueryProcessor.validateKey(key);
-            // We should not share the slice filter amongst the commands (hence the cloneShallow), due to
-            // SliceQueryFilter not being immutable due to its columnCounter used by the lastCounted() method
-            // (this is fairly ugly and we should change that but that's probably not a tiny refactor to do that cleanly)
-            commands.add(ReadCommand.create(keyspace(), ByteBufferUtil.clone(key), columnFamily(), now, filter.cloneShallow()));
+            DecoratedKey dk = StorageService.getPartitioner().decorateKey(ByteBufferUtil.clone(key));
+            commands.add(SinglePartitionReadCommand.create(cfm, nowInSec, queriedColumns, rowFilter, limit, dk, filter));
         }
 
-        return commands;
+        return new SinglePartitionReadCommand.Group(commands, limit);
     }
 
-    private RangeSliceCommand getRangeCommand(QueryOptions options, int limit, long now) throws RequestValidationException
+    private ReadQuery getRangeCommand(QueryOptions options, DataLimits limit, int nowInSec) throws RequestValidationException
     {
-        IDiskAtomFilter filter = makeFilter(options, limit);
-        if (filter == null)
-            return null;
+        ClusteringIndexFilter clusteringIndexFilter = makeClusteringIndexFilter(options);
+        if (clusteringIndexFilter == null)
+            return ReadQuery.EMPTY;
 
-        List<IndexExpression> expressions = getValidatedIndexExpressions(options);
+        RowFilter rowFilter = getRowFilter(options);
         // The LIMIT provided by the user is the number of CQL row he wants returned.
         // We want to have getRangeSlice to count the number of columns, not the number of keys.
-        AbstractBounds<RowPosition> keyBounds = restrictions.getPartitionKeyBounds(options);
+        AbstractBounds<PartitionPosition> keyBounds = restrictions.getPartitionKeyBounds(options);
         return keyBounds == null
-             ? null
-             : new RangeSliceCommand(keyspace(), columnFamily(), now,  filter, keyBounds, expressions, limit, !parameters.isDistinct, false);
-    }
-
-    private ColumnSlice makeStaticSlice()
-    {
-        // Note: we could use staticPrefix.start() for the start bound, but EMPTY gives us the
-        // same effect while saving a few CPU cycles.
-        return isReversed
-             ? new ColumnSlice(cfm.comparator.staticPrefix().end(), Composites.EMPTY)
-             : new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end());
+             ? ReadQuery.EMPTY
+             : new PartitionRangeReadCommand(cfm, nowInSec, queriedColumns, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter));
     }
 
-    private IDiskAtomFilter makeFilter(QueryOptions options, int limit)
+    private ClusteringIndexFilter makeClusteringIndexFilter(QueryOptions options)
     throws InvalidRequestException
     {
-        int toGroup = cfm.comparator.isDense() ? -1 : cfm.clusteringColumns().size();
         if (parameters.isDistinct)
         {
-            // For distinct, we only care about fetching the beginning of each partition. If we don't have
-            // static columns, we in fact only care about the first cell, so we query only that (we don't "group").
-            // If we do have static columns, we do need to fetch the first full group (to have the static columns values).
-
-            // See the comments on IGNORE_TOMBSTONED_PARTITIONS and CASSANDRA-8490 for why we use a special value for
-            // DISTINCT queries on the partition key only.
-            toGroup = selection.containsStaticColumns() ? toGroup : SliceQueryFilter.IGNORE_TOMBSTONED_PARTITIONS;
-            return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, 1, toGroup);
+            // We need to be able to distinguish between partition having live rows and those that don't. But
+            // doing so is not trivial since "having a live row" depends potentially on
+            //   1) when the query is performed, due to TTLs
+            //   2) how thing reconcile together between different nodes
+            // so that it's hard to really optimize properly internally. So to keep it simple, we simply query
+            // for the first row of the partition and hence uses Slices.ALL. We'll limit it to the first live
+            // row however in getLimit().
+            return new ClusteringIndexSliceFilter(Slices.ALL, false);
         }
-        else if (restrictions.isColumnRange())
-        {
-            List<Composite> startBounds = restrictions.getClusteringColumnsBoundsAsComposites(Bound.START, options);
-            List<Composite> endBounds = restrictions.getClusteringColumnsBoundsAsComposites(Bound.END, options);
-            assert startBounds.size() == endBounds.size();
-
-            // Handles fetching static columns. Note that for 2i, the filter is just used to restrict
-            // the part of the index to query so adding the static slice would be useless and confusing.
-            // For 2i, static columns are retrieve in CompositesSearcher with each index hit.
-            ColumnSlice staticSlice = selection.containsStaticColumns() && !restrictions.usesSecondaryIndexing()
-                                    ? makeStaticSlice()
-                                    : null;
-
-            // The case where startBounds == 1 is common enough that it's worth optimizing
-            if (startBounds.size() == 1)
-            {
-                ColumnSlice slice = new ColumnSlice(startBounds.get(0), endBounds.get(0));
-                if (slice.isAlwaysEmpty(cfm.comparator, isReversed))
-                    return staticSlice == null ? null : sliceFilter(staticSlice, limit, toGroup);
-
-                if (staticSlice == null)
-                    return sliceFilter(slice, limit, toGroup);
-
-                if (isReversed)
-                    return slice.includes(cfm.comparator.reverseComparator(), staticSlice.start)
-                            ? sliceFilter(new ColumnSlice(slice.start, staticSlice.finish), limit, toGroup)
-                            : sliceFilter(new ColumnSlice[]{ slice, staticSlice }, limit, toGroup);
-                else
-                    return slice.includes(cfm.comparator, staticSlice.finish)
-                            ? sliceFilter(new ColumnSlice(staticSlice.start, slice.finish), limit, toGroup)
-                            : sliceFilter(new ColumnSlice[]{ staticSlice, slice }, limit, toGroup);
-            }
-
-            List<ColumnSlice> l = new ArrayList<ColumnSlice>(startBounds.size());
-            for (int i = 0; i < startBounds.size(); i++)
-            {
-                ColumnSlice slice = new ColumnSlice(startBounds.get(i), endBounds.get(i));
-                if (!slice.isAlwaysEmpty(cfm.comparator, isReversed))
-                    l.add(slice);
-            }
 
-            if (l.isEmpty())
-                return staticSlice == null ? null : sliceFilter(staticSlice, limit, toGroup);
-            if (staticSlice == null)
-                return sliceFilter(l.toArray(new ColumnSlice[l.size()]), limit, toGroup);
+        if (restrictions.isColumnRange())
+        {
+            Slices slices = makeSlices(options);
+            if (slices == Slices.NONE && !selection.containsStaticColumns())
+                return null;
 
-            // The slices should not overlap. We know the slices built from startBounds/endBounds don't, but if there is
-            // a static slice, it could overlap with the 2nd slice. Check for it and correct if that's the case
-            ColumnSlice[] slices;
-            if (isReversed)
-            {
-                if (l.get(l.size() - 1).includes(cfm.comparator.reverseComparator(), staticSlice.start))
-                {
-                    slices = l.toArray(new ColumnSlice[l.size()]);
-                    slices[slices.length-1] = new ColumnSlice(slices[slices.length-1].start, Composites.EMPTY);
-                }
-                else
-                {
-                    slices = l.toArray(new ColumnSlice[l.size()+1]);
-                    slices[slices.length-1] = staticSlice;
-                }
-            }
-            else
-            {
-                if (l.get(0).includes(cfm.comparator, staticSlice.finish))
-                {
-                    slices = new ColumnSlice[l.size()];
-                    slices[0] = new ColumnSlice(Composites.EMPTY, l.get(0).finish);
-                    for (int i = 1; i < l.size(); i++)
-                        slices[i] = l.get(i);
-                }
-                else
-                {
-                    slices = new ColumnSlice[l.size()+1];
-                    slices[0] = staticSlice;
-                    for (int i = 0; i < l.size(); i++)
-                        slices[i+1] = l.get(i);
-                }
-            }
-            return sliceFilter(slices, limit, toGroup);
+            return new ClusteringIndexSliceFilter(slices, isReversed);
         }
         else
         {
-            SortedSet<CellName> cellNames = getRequestedColumns(options);
-            if (cellNames == null) // in case of IN () for the last column of the key
+            NavigableSet<Clustering> clusterings = getRequestedRows(options);
+            if (clusterings.isEmpty() && !selection.containsStaticColumns()) // in case of IN () for the last column of the key
                 return null;
-            QueryProcessor.validateCellNames(cellNames, cfm.comparator);
-            return new NamesQueryFilter(cellNames, true);
+
+            return new ClusteringIndexNamesFilter(clusterings, isReversed);
         }
     }
 
-    private SliceQueryFilter sliceFilter(ColumnSlice slice, int limit, int toGroup)
+    private Slices makeSlices(QueryOptions options)
+    throws InvalidRequestException
     {
-        return sliceFilter(new ColumnSlice[]{ slice }, limit, toGroup);
-    }
+        SortedSet<Slice.Bound> startBounds = restrictions.getClusteringColumnsBounds(Bound.START, options);
+        SortedSet<Slice.Bound> endBounds = restrictions.getClusteringColumnsBounds(Bound.END, options);
+        assert startBounds.size() == endBounds.size();
 
-    private SliceQueryFilter sliceFilter(ColumnSlice[] slices, int limit, int toGroup)
-    {
-        assert ColumnSlice.validateSlices(slices, cfm.comparator, isReversed) : String.format("Invalid slices: " + Arrays.toString(slices) + (isReversed ? " (reversed)" : ""));
-        return new SliceQueryFilter(slices, isReversed, limit, toGroup);
+        // The case where startBounds == 1 is common enough that it's worth optimizing
+        if (startBounds.size() == 1)
+        {
+            Slice.Bound start = startBounds.first();
+            Slice.Bound end = endBounds.first();
+            return cfm.comparator.compare(start, end) > 0
+                 ? Slices.NONE
+                 : Slices.with(cfm.comparator, Slice.make(start, end));
+        }
+
+        Slices.Builder builder = new Slices.Builder(cfm.comparator, startBounds.size());
+        Iterator<Slice.Bound> startIter = startBounds.iterator();
+        Iterator<Slice.Bound> endIter = endBounds.iterator();
+        while (startIter.hasNext() && endIter.hasNext())
+        {
+            Slice.Bound start = startIter.next();
+            Slice.Bound end = endIter.next();
+
+            // Ignore slices that are nonsensical
+            if (cfm.comparator.compare(start, end) > 0)
+                continue;
+
+            builder.add(start, end);
+        }
+
+        return builder.build();
     }
 
     /**
      * May be used by custom QueryHandler implementations
      */
-    public int getLimit(QueryOptions options) throws InvalidRequestException
+    public DataLimits getLimit(QueryOptions options) throws InvalidRequestException
     {
-        if (limit != null)
+        int userLimit = -1;
+        // If we aggregate, the limit really apply to the number of rows returned to the user, not to what is queried, and
+        // since in practice we currently only aggregate at top level (we have no GROUP BY support yet), we'll only ever
+        // return 1 result and can therefore basically ignore the user LIMIT in this case.
+        // Whenever we support GROUP BY, we'll have to add a new DataLimits kind that knows how things are grouped and is thus
+        // able to apply the user limit properly.
+        if (limit != null && !selection.isAggregate())
         {
             ByteBuffer b = checkNotNull(limit.bindAndGet(options), "Invalid null value of limit");
             // treat UNSET limit value as 'unlimited'
-            if (b == UNSET_BYTE_BUFFER)
-                return Integer.MAX_VALUE;
-            try
-            {
-                Int32Type.instance.validate(b);
-                int l = Int32Type.instance.compose(b);
-                checkTrue(l > 0, "LIMIT must be strictly positive");
-                return l;
-            }
-            catch (MarshalException e)
+            if (b != UNSET_BYTE_BUFFER)
             {
-                throw new InvalidRequestException("Invalid limit value");
+                try
+                {
+                    Int32Type.instance.validate(b);
+                    userLimit = Int32Type.instance.compose(b);
+                    checkTrue(userLimit > 0, "LIMIT must be strictly positive");
+                }
+                catch (MarshalException e)
+                {
+                    throw new InvalidRequestException("Invalid limit value");
+                }
             }
         }
-        return Integer.MAX_VALUE;
-    }
 
-    private int updateLimitForQuery(int limit)
-    {
-        // Internally, we don't support exclusive bounds for slices. Instead, we query one more element if necessary
-        // and exclude it later (in processColumnFamily)
-        return restrictions.isNonCompositeSliceWithExclusiveBounds() && limit != Integer.MAX_VALUE
-             ? limit + 1
-             : limit;
+        if (parameters.isDistinct)
+            return userLimit < 0 ? DataLimits.DISTINCT_NONE : DataLimits.distinctLimits(userLimit);
+
+        return userLimit < 0 ? DataLimits.NONE : DataLimits.cqlLimits(userLimit);
     }
 
-    private SortedSet<CellName> getRequestedColumns(QueryOptions options) throws InvalidRequestException
+    private NavigableSet<Clustering> getRequestedRows(QueryOptions options) throws InvalidRequestException
     {
         // Note: getRequestedColumns don't handle static columns, but due to CASSANDRA-5762
         // we always do a slice for CQL3 tables, so it's ok to ignore them here
         assert !restrictions.isColumnRange();
-        SortedSet<CellName> columns = new TreeSet<CellName>(cfm.comparator);
-        for (Composite composite : restrictions.getClusteringColumnsAsComposites(options))
-            columns.addAll(addSelectedColumns(composite));
-        return columns;
-    }
-
-    private SortedSet<CellName> addSelectedColumns(Composite prefix)
-    {
-        if (cfm.comparator.isDense())
-        {
-            return FBUtilities.singleton(cfm.comparator.create(prefix, null), cfm.comparator);
-        }
-        else
-        {
-            SortedSet<CellName> columns = new TreeSet<CellName>(cfm.comparator);
-
-            // We need to query the selected column as well as the marker
-            // column (for the case where the row exists but has no columns outside the PK)
-            // Two exceptions are "static CF" (non-composite non-compact CF) and "super CF"
-            // that don't have marker and for which we must query all columns instead
-            if (cfm.comparator.isCompound() && !cfm.isSuper())
-            {
-                // marker
-                columns.add(cfm.comparator.rowMarker(prefix));
-
-                // selected columns
-                for (ColumnDefinition def : selection.getColumns())
-                    if (def.isRegular() || def.isStatic())
-                        columns.add(cfm.comparator.create(prefix, def));
-            }
-            else
-            {
-                // We now that we're not composite so we can ignore static columns
-                for (ColumnDefinition def : cfm.regularColumns())
-                    columns.add(cfm.comparator.create(prefix, def));
-            }
-            return columns;
-        }
+        return restrictions.getClusteringColumns(options);
     }
 
     /**
      * May be used by custom QueryHandler implementations
      */
-    public List<IndexExpression> getValidatedIndexExpressions(QueryOptions options) throws InvalidRequestException
+    public RowFilter getRowFilter(QueryOptions options) throws InvalidRequestException
     {
-        if (!restrictions.usesSecondaryIndexing())
-            return Collections.emptyList();
-
         ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(columnFamily());
         SecondaryIndexManager secondaryIndexManager = cfs.indexManager;
-
-        List<IndexExpression> expressions = restrictions.getIndexExpressions(secondaryIndexManager, options);
-
-        secondaryIndexManager.validateIndexSearchersForQuery(expressions);
-
-        return expressions;
-    }
-
-    private CellName makeExclusiveSliceBound(Bound bound, CellNameType type, QueryOptions options) throws InvalidRequestException
-    {
-        if (restrictions.areRequestedBoundsInclusive(bound))
-            return null;
-
-       return type.makeCellName(restrictions.getClusteringColumnsBounds(bound, options).get(0));
+        RowFilter filter = restrictions.getRowFilter(secondaryIndexManager, options);
+        secondaryIndexManager.validateFilter(filter);
+        return filter;
     }
 
-    private Iterator<Cell> applySliceRestriction(final Iterator<Cell> cells, final QueryOptions options) throws InvalidRequestException
+    private ResultSet process(PartitionIterator partitions, QueryOptions options, int nowInSec) throws InvalidRequestException
     {
-        final CellNameType type = cfm.comparator;
-
-        final CellName excludedStart = makeExclusiveSliceBound(Bound.START, type, options);
-        final CellName excludedEnd = makeExclusiveSliceBound(Bound.END, type, options);
-
-        return Iterators.filter(cells, new Predicate<Cell>()
+        Selection.ResultSetBuilder result = selection.resultSetBuilder(parameters.isJson);
+        while (partitions.hasNext())
         {
-            public boolean apply(Cell c)
+            try (RowIterator partition = partitions.next())
             {
-                // For dynamic CF, the column could be out of the requested bounds (because we don't support strict bounds internally (unless
-                // the comparator is composite that is)), filter here
-                return !((excludedStart != null && type.compare(c.name(), excludedStart) == 0)
-                            || (excludedEnd != null && type.compare(c.name(), excludedEnd) == 0));
+                processPartition(partition, options, result, nowInSec);
             }
-        });
-    }
-
-    private ResultSet process(List<Row> rows, QueryOptions options, int limit, long now) throws InvalidRequestException
-    {
-        Selection.ResultSetBuilder result = selection.resultSetBuilder(now, parameters.isJson);
-        for (org.apache.cassandra.db.Row row : rows)
-        {
-            // Not columns match the query, skip
-            if (row.cf == null)
-                continue;
-
-            processColumnFamily(row.key.getKey(), row.cf, options, now, result);
         }
 
         ResultSet cqlRows = result.build(options.getProtocolVersion());
 
         orderResults(cqlRows);
 
-        // Internal calls always return columns in the comparator order, even when reverse was set
-        if (isReversed)
-            cqlRows.reverse();
-
-        // Trim result if needed to respect the user limit
-        cqlRows.trim(limit);
         return cqlRows;
     }
 
-    // Used by ModificationStatement for CAS operations
-    void processColumnFamily(ByteBuffer key, ColumnFamily cf, QueryOptions options, long now, Selection.ResultSetBuilder result)
-    throws InvalidRequestException
+    public static ByteBuffer[] getComponents(CFMetaData cfm, DecoratedKey dk)
     {
-        CFMetaData cfm = cf.metadata();
-        ByteBuffer[] keyComponents = null;
+        ByteBuffer key = dk.getKey();
         if (cfm.getKeyValidator() instanceof CompositeType)
         {
-            keyComponents = ((CompositeType)cfm.getKeyValidator()).split(key);
+            return ((CompositeType)cfm.getKeyValidator()).split(key);
         }
         else
         {
-            keyComponents = new ByteBuffer[]{ key };
+            return new ByteBuffer[]{ key };
         }
+    }
 
-        Iterator<Cell> cells = cf.getSortedColumns().iterator();
-        if (restrictions.isNonCompositeSliceWithExclusiveBounds())
-            cells = applySliceRestriction(cells, options);
-
+    // Used by ModificationStatement for CAS operations
+    void processPartition(RowIterator partition, QueryOptions options, Selection.ResultSetBuilder result, int nowInSec)
+    throws InvalidRequestException
+    {
         int protocolVersion = options.getProtocolVersion();
-        CQL3Row.RowIterator iter = cfm.comparator.CQL3RowBuilder(cfm, now).group(cells);
 
-        // If there is static columns but there is no non-static row, then provided the select was a full
-        // partition selection (i.e. not a 2ndary index search and there was no condition on clustering columns)
-        // then we want to include the static columns in the result set (and we're done).
-        CQL3Row staticRow = iter.getStaticRow();
-        if (staticRow != null && !iter.hasNext() && !restrictions.usesSecondaryIndexing() && restrictions.hasNoClusteringColumnsRestriction())
+        ByteBuffer[] keyComponents = getComponents(cfm, partition.partitionKey());
+
+        Row staticRow = partition.staticRow().takeAlias();
+        // If there is no rows, then provided the select was a full partition selection
+        // (i.e. not a 2ndary index search and there was no condition on clustering columns),
+        // we want to include static columns and we're done.
+        if (!partition.hasNext())
         {
-            result.newRow(protocolVersion);
-            for (ColumnDefinition def : selection.getColumns())
+            if (!staticRow.isEmpty() && (!restrictions.usesSecondaryIndexing() || cfm.isStaticCompactTable()) && !restrictions.hasClusteringColumnsRestriction())
             {
-                switch (def.kind)
+                result.newRow(protocolVersion);
+                for (ColumnDefinition def : selection.getColumns())
                 {
-                    case PARTITION_KEY:
-                        result.add(keyComponents[def.position()]);
-                        break;
-                    case STATIC:
-                        addValue(result, def, staticRow, options);
-                        break;
-                    default:
-                        result.add((ByteBuffer)null);
+                    switch (def.kind)
+                    {
+                        case PARTITION_KEY:
+                            result.add(keyComponents[def.position()]);
+                            break;
+                        case STATIC:
+                            addValue(result, def, staticRow, nowInSec, protocolVersion);
+                            break;
+                        default:
+                            result.add((ByteBuffer)null);
+                    }
                 }
             }
             return;
         }
 
-        while (iter.hasNext())
+        while (partition.hasNext())
         {
-            CQL3Row cql3Row = iter.next();
-
-            // Respect requested order
+            Row row = partition.next();
             result.newRow(protocolVersion);
             // Respect selection order
             for (ColumnDefinition def : selection.getColumns())
@@ -702,41 +634,35 @@ public class SelectStatement implements CQLStatement
                         result.add(keyComponents[def.position()]);
                         break;
                     case CLUSTERING_COLUMN:
-                        result.add(cql3Row.getClusteringColumn(def.position()));
-                        break;
-                    case COMPACT_VALUE:
-                        result.add(cql3Row.getColumn(null));
+                        result.add(row.clustering().get(def.position()));
                         break;
                     case REGULAR:
-                        addValue(result, def, cql3Row, options);
+                        addValue(result, def, row, nowInSec, protocolVersion);
                         break;
                     case STATIC:
-                        addValue(result, def, staticRow, options);
+                        addValue(result, def, staticRow, nowInSec, protocolVersion);
                         break;
                 }
             }
         }
     }
 
-    private static void addValue(Selection.ResultSetBuilder result, ColumnDefinition def, CQL3Row row, QueryOptions options)
+    private static void addValue(Selection.ResultSetBuilder result, ColumnDefinition def, Row row, int nowInSec, int protocolVersion)
     {
-        if (row == null)
+        if (def.isComplex())
         {
-            result.add((ByteBuffer)null);
-            return;
+            // Collections are the only complex types we have so far
+            assert def.type.isCollection() && def.type.isMultiCell();
+            Iterator<Cell> cells = row.getCells(def);
+            if (cells == null)
+                result.add((ByteBuffer)null);
+            else
+                result.add(((CollectionType)def.type).serializeForNativeProtocol(def, cells, protocolVersion));
         }
-
-        if (def.type.isMultiCell())
+        else
         {
-            List<Cell> cells = row.getMultiCellColumn(def.name);
-            ByteBuffer buffer = cells == null
-                             ? null
-                             : ((CollectionType)def.type).serializeForNativeProtocol(def, cells, options.getProtocolVersion());
-            result.add(buffer);
-            return;
+            result.add(row.getCell(def), nowInSec);
         }
-
-        result.add(row.getColumn(def.name));
     }
 
     private boolean needsPostQueryOrdering()
@@ -796,9 +722,6 @@ public class SelectStatement implements CQLStatement
                 isReversed = isReversed(cfm);
             }
 
-            if (isReversed)
-                restrictions.reverse();
-
             checkNeedsFiltering(restrictions);
 
             SelectStatement stmt = new SelectStatement(cfm,
@@ -832,7 +755,8 @@ public class SelectStatement implements CQLStatement
                                                  whereClause,
                                                  boundNames,
                                                  selection.containsOnlyStaticColumns(),
-                                                 selection.containsACollection());
+                                                 selection.containsACollection(),
+                                                 parameters.allowFiltering);
             }
             catch (UnrecognizedEntityException e)
             {
@@ -980,47 +904,12 @@ public class SelectStatement implements CQLStatement
             {
                 // We will potentially filter data if either:
                 //  - Have more than one IndexExpression
-                //  - Have no index expression and the column filter is not the identity
+                //  - Have no index expression and the row filter is not the identity
                 checkFalse(restrictions.needFiltering(),
                            "Cannot execute this query as it might involve data filtering and " +
                            "thus may have unpredictable performance. If you want to execute " +
                            "this query despite the performance unpredictability, use ALLOW FILTERING");
             }
-
-            // We don't internally support exclusive slice bounds on non-composite tables. To deal with it we do an
-            // inclusive slice and remove post-query the value that shouldn't be returned. One problem however is that
-            // if there is a user limit, that limit may make the query return before the end of the slice is reached,
-            // in which case, once we'll have removed bound post-query, we might end up with less results than
-            // requested which would be incorrect. For single-partition query, this is not a problem, we just ask for
-            // one more result (see updateLimitForQuery()) since that's enough to compensate for that problem. For key
-            // range however, each returned row may include one result that will have to be trimmed, so we would have
-            // to bump the query limit by N where N is the number of rows we will return, but we don't know that in
-            // advance. So, since we currently don't have a good way to handle such query, we refuse it (#7059) rather
-            // than answering with something that is wrong.
-            if (restrictions.isNonCompositeSliceWithExclusiveBounds() && restrictions.isKeyRange() && limit != null)
-            {
-                SingleColumnRelation rel = findInclusiveClusteringRelationForCompact(restrictions.cfm);
-                throw invalidRequest("The query requests a restriction of rows with a strict bound (%s) over a range of partitions. "
-                                   + "This is not supported by the underlying storage engine for COMPACT tables if a LIMIT is provided. "
-                                   + "Please either make the condition non strict (%s) or remove the user LIMIT", rel, rel.withNonStrictOperator());
-            }
-        }
-
-        private SingleColumnRelation findInclusiveClusteringRelationForCompact(CFMetaData cfm)
-        {
-            for (Relation r : whereClause)
-            {
-                // We only call this when sliceRestriction != null, i.e. for compact table with non composite comparator,
-                // so it can't be a MultiColumnRelation.
-                SingleColumnRelation rel = (SingleColumnRelation)r;
-
-                if (cfm.getColumnDefinition(rel.getEntity().prepare(cfm)).isClusteringColumn()
-                        && (rel.operator() == Operator.GT || rel.operator() == Operator.LT))
-                    return rel;
-            }
-
-            // We're not supposed to call this method unless we know this can't happen
-            throw new AssertionError();
         }
 
         private boolean containsAlias(final ColumnIdentifier name)