You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/06/25 10:32:41 UTC

[3/4] Add auto paging capability to the native protocol

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/RowIteratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIteratorFactory.java b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
index c9f715c..c02da1d 100644
--- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java
+++ b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
@@ -45,18 +45,15 @@ public class RowIteratorFactory
      * and filtered by the queryfilter.
      * @param memtables Memtables pending flush.
      * @param sstables SStables to scan through.
-     * @param startWith Start at this key
-     * @param stopAt Stop and this key
-     * @param filter Used to decide which columns to pull out
+     * @param range The data range to fetch
      * @param cfs
      * @return A row iterator following all the given restrictions
      */
     public static CloseableIterator<Row> getIterator(final Iterable<Memtable> memtables,
                                                      final Collection<SSTableReader> sstables,
-                                                     final RowPosition startWith,
-                                                     final RowPosition stopAt,
-                                                     final QueryFilter filter,
-                                                     final ColumnFamilyStore cfs)
+                                                     final DataRange range,
+                                                     final ColumnFamilyStore cfs,
+                                                     final long now)
     {
         // fetch data from current memtable, historical memtables, and SSTables in the correct order.
         final List<CloseableIterator<OnDiskAtomIterator>> iterators = new ArrayList<CloseableIterator<OnDiskAtomIterator>>();
@@ -64,19 +61,19 @@ public class RowIteratorFactory
         // memtables
         for (Memtable memtable : memtables)
         {
-            iterators.add(new ConvertToColumnIterator<AtomicSortedColumns>(filter, memtable.getEntryIterator(startWith, stopAt)));
+            iterators.add(new ConvertToColumnIterator<AtomicSortedColumns>(range, memtable.getEntryIterator(range.startKey(), range.stopKey())));
         }
 
         for (SSTableReader sstable : sstables)
         {
-            final SSTableScanner scanner = sstable.getScanner(filter, startWith);
+            final SSTableScanner scanner = sstable.getScanner(range);
             iterators.add(scanner);
         }
 
         // reduce rows from all sources into a single row
         return MergeIterator.get(iterators, COMPARE_BY_KEY, new MergeIterator.Reducer<OnDiskAtomIterator, Row>()
         {
-            private final int gcBefore = cfs.gcBefore(filter.timestamp);
+            private final int gcBefore = cfs.gcBefore(now);
             private final List<OnDiskAtomIterator> colIters = new ArrayList<OnDiskAtomIterator>();
             private DecoratedKey key;
             private ColumnFamily returnCF;
@@ -96,17 +93,16 @@ public class RowIteratorFactory
 
             protected Row getReduced()
             {
-
                 // First check if this row is in the rowCache. If it is we can skip the rest
                 ColumnFamily cached = cfs.getRawCachedRow(key);
                 if (cached == null)
                 {
                     // not cached: collate
-                    filter.collateOnDiskAtom(returnCF, colIters, gcBefore);
+                    QueryFilter.collateOnDiskAtom(returnCF, colIters, range.columnFilter(key.key), gcBefore, now);
                 }
                 else
                 {
-                    QueryFilter keyFilter = new QueryFilter(key, filter.cfName, filter.filter, filter.timestamp);
+                    QueryFilter keyFilter = new QueryFilter(key, cfs.name, range.columnFilter(key.key), now);
                     returnCF = cfs.filterColumnFamily(cached, keyFilter);
                 }
 
@@ -123,12 +119,12 @@ public class RowIteratorFactory
      */
     private static class ConvertToColumnIterator<T extends ColumnFamily> implements CloseableIterator<OnDiskAtomIterator>
     {
-        private final QueryFilter filter;
+        private final DataRange range;
         private final Iterator<Map.Entry<DecoratedKey, T>> iter;
 
-        public ConvertToColumnIterator(QueryFilter filter, Iterator<Map.Entry<DecoratedKey, T>> iter)
+        public ConvertToColumnIterator(DataRange range, Iterator<Map.Entry<DecoratedKey, T>> iter)
         {
-            this.filter = filter;
+            this.range = range;
             this.iter = iter;
         }
 
@@ -151,7 +147,7 @@ public class RowIteratorFactory
             {
                 public OnDiskAtomIterator create()
                 {
-                    return filter.getMemtableColumnIterator(entry.getValue(), entry.getKey());
+                    return range.columnFilter(entry.getKey().key).getColumnFamilyIterator(entry.getKey(), entry.getValue());
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index 5c42de5..508d1d2 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -106,6 +106,11 @@ public class SliceFromReadCommand extends ReadCommand
         return filter;
     }
 
+    public SliceFromReadCommand withUpdatedFilter(SliceQueryFilter newFilter)
+    {
+        return new SliceFromReadCommand(table, key, cfName, timestamp, newFilter);
+    }
+
     /**
      * The original number of columns requested by the user.
      * This can be different from count when the slice command is a retry (see

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceQueryPager.java b/src/java/org/apache/cassandra/db/SliceQueryPager.java
deleted file mode 100644
index c1933ad..0000000
--- a/src/java/org/apache/cassandra/db/SliceQueryPager.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.util.Arrays;
-import java.util.Iterator;
-
-import org.apache.cassandra.db.filter.*;
-
-public class SliceQueryPager implements Iterator<ColumnFamily>
-{
-    public static final int DEFAULT_PAGE_SIZE = 10000;
-
-    public final ColumnFamilyStore cfs;
-    public final DecoratedKey key;
-
-    private ColumnSlice[] slices;
-    private boolean exhausted;
-
-    public SliceQueryPager(ColumnFamilyStore cfs, DecoratedKey key, ColumnSlice[] slices)
-    {
-        this.cfs = cfs;
-        this.key = key;
-        this.slices = slices;
-    }
-
-    // This will *not* do a query
-    public boolean hasNext()
-    {
-        return !exhausted;
-    }
-
-    // This might return an empty column family (but never a null one)
-    public ColumnFamily next()
-    {
-        if (exhausted)
-            return null;
-
-        long now = System.currentTimeMillis();
-        SliceQueryFilter sliceFilter = new SliceQueryFilter(slices, false, DEFAULT_PAGE_SIZE);
-        QueryFilter filter = new QueryFilter(key, cfs.name, sliceFilter, now);
-        ColumnFamily cf = cfs.getColumnFamily(filter);
-        if (cf == null || sliceFilter.getLiveCount(cf, now) < DEFAULT_PAGE_SIZE)
-        {
-            exhausted = true;
-        }
-        else
-        {
-            Iterator<Column> iter = cf.getReverseSortedColumns().iterator();
-            Column lastColumn = iter.next();
-            while (lastColumn.isMarkedForDelete(now))
-                lastColumn = iter.next();
-
-            int i = 0;
-            for (; i < slices.length; ++i)
-            {
-                ColumnSlice current = slices[i];
-                if (cfs.getComparator().compare(lastColumn.name(), current.finish) <= 0)
-                    break;
-            }
-            if (i >= slices.length)
-                exhausted = true;
-            else
-                slices = Arrays.copyOfRange(slices, i, slices.length);
-        }
-        return cf == null ? EmptyColumns.factory.create(cfs.metadata) : cf;
-    }
-
-    public void remove()
-    {
-        throw new UnsupportedOperationException();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
index 90bfd8d..8528515 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -678,7 +678,8 @@ public class SystemTable
         return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(), minToken.maxKeyBound()),
                                                      null,
                                                      new IdentityQueryFilter(),
-                                                     Integer.MAX_VALUE);
+                                                     Integer.MAX_VALUE,
+                                                     System.currentTimeMillis());
     }
 
     public static Collection<RowMutation> serializeSchema()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index 772f842..409076f 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.pager.QueryPagers;
 import org.apache.cassandra.tracing.Tracing;
 
 /**
@@ -50,6 +51,7 @@ import org.apache.cassandra.tracing.Tracing;
 public class Table
 {
     public static final String SYSTEM_KS = "system";
+    private static final int DEFAULT_PAGE_SIZE = 10000;
 
     private static final Logger logger = LoggerFactory.getLogger(Table.class);
 
@@ -398,7 +400,7 @@ public class Table
         switchLock.readLock().lock();
         try
         {
-            SliceQueryPager pager = new SliceQueryPager(cfs, key, ColumnSlice.ALL_COLUMNS_ARRAY);
+            Iterator<ColumnFamily> pager = QueryPagers.pageRowLocally(cfs, key.key, DEFAULT_PAGE_SIZE);
             while (pager.hasNext())
             {
                 ColumnFamily cf = pager.next();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
index a5d54ce..2fda715 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
@@ -61,6 +61,16 @@ public class ColumnCounter
         return ignored;
     }
 
+    public ColumnCounter countAll(ColumnFamily container)
+    {
+        if (container == null)
+            return this;
+
+        for (Column c : container)
+            count(c, container);
+        return this;
+    }
+
     public static class GroupByPrefix extends ColumnCounter
     {
         private final CompositeType type;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
index af7431c..49503b5 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
@@ -89,6 +89,12 @@ public class ColumnSlice
         return cmp.compare(start, name) <= 0 && (finish.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER) || cmp.compare(finish, name) >= 0);
     }
 
+    public boolean isBefore(Comparator<ByteBuffer> cmp, ByteBuffer name)
+    {
+        return !finish.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER) && cmp.compare(finish, name) < 0;
+    }
+
+
     @Override
     public final int hashCode()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index d061ac2..e0f3ec3 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.db.filter;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.List;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -32,6 +33,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -45,47 +47,36 @@ public abstract class ExtendedFilter
 
     public final ColumnFamilyStore cfs;
     public final long timestamp;
-    protected final IDiskAtomFilter originalFilter;
+    public final DataRange dataRange;
     private final int maxResults;
     private final boolean countCQL3Rows;
-    private final boolean isPaging;
+    private volatile int currentLimit;
 
     public static ExtendedFilter create(ColumnFamilyStore cfs,
+                                        DataRange dataRange,
                                         List<IndexExpression> clause,
-                                        IDiskAtomFilter filter,
                                         int maxResults,
-                                        long timestamp,
                                         boolean countCQL3Rows,
-                                        boolean isPaging)
+                                        long timestamp)
     {
         if (clause == null || clause.isEmpty())
-        {
-            return new EmptyClauseFilter(cfs, filter, maxResults, timestamp, countCQL3Rows, isPaging);
-        }
-        else
-        {
-            if (isPaging)
-                throw new IllegalArgumentException("Cross-row paging is not supported along with index clauses");
-            return cfs.getComparator() instanceof CompositeType
-                 ? new FilterWithCompositeClauses(cfs, clause, filter, maxResults, timestamp, countCQL3Rows)
-                 : new FilterWithClauses(cfs, clause, filter, maxResults, timestamp, countCQL3Rows);
-        }
+            return new EmptyClauseFilter(cfs, dataRange, maxResults, countCQL3Rows, timestamp);
+
+        return new WithClauses(cfs, dataRange, clause, maxResults, countCQL3Rows, timestamp);
     }
 
-    protected ExtendedFilter(ColumnFamilyStore cfs, IDiskAtomFilter filter, int maxResults, long timestamp, boolean countCQL3Rows, boolean isPaging)
+    protected ExtendedFilter(ColumnFamilyStore cfs, DataRange dataRange, int maxResults, boolean countCQL3Rows, long timestamp)
     {
         assert cfs != null;
-        assert filter != null;
+        assert dataRange != null;
         this.cfs = cfs;
-        this.originalFilter = filter;
+        this.dataRange = dataRange;
         this.maxResults = maxResults;
         this.timestamp = timestamp;
         this.countCQL3Rows = countCQL3Rows;
-        this.isPaging = isPaging;
+        this.currentLimit = maxResults;
         if (countCQL3Rows)
-            originalFilter.updateColumnsLimit(maxResults);
-        if (isPaging && (!(originalFilter instanceof SliceQueryFilter) || ((SliceQueryFilter)originalFilter).finish().remaining() != 0))
-            throw new IllegalArgumentException("Cross-row paging is only supported for SliceQueryFilter having an empty finish column");
+            dataRange.updateColumnsLimit(maxResults);
     }
 
     public int maxRows()
@@ -98,37 +89,30 @@ public abstract class ExtendedFilter
         return countCQL3Rows ? maxResults : Integer.MAX_VALUE;
     }
 
-    /**
-     * Update the filter if necessary given the number of column already
-     * fetched.
-     */
-    public void updateFilter(int currentColumnsCount)
+    public int currentLimit()
     {
-        // As soon as we'd done our first call, we want to reset the start column if we're paging
-        if (isPaging)
-            ((SliceQueryFilter)initialFilter()).setStart(ByteBufferUtil.EMPTY_BYTE_BUFFER);
-
-        if (!countCQL3Rows)
-            return;
+        return currentLimit;
+    }
 
-        int remaining = maxResults - currentColumnsCount;
-        initialFilter().updateColumnsLimit(remaining);
+    public IDiskAtomFilter columnFilter(ByteBuffer key)
+    {
+        return dataRange.columnFilter(key);
     }
 
     public int lastCounted(ColumnFamily data)
     {
-        if (initialFilter() instanceof SliceQueryFilter)
-            return ((SliceQueryFilter)initialFilter()).lastCounted();
-        else
-            return initialFilter().getLiveCount(data, timestamp);
+        return dataRange.getLiveCount(data, timestamp);
     }
 
-    /** The initial filter we'll do our first slice with (either the original or a superset of it) */
-    public abstract IDiskAtomFilter initialFilter();
-
-    public IDiskAtomFilter originalFilter()
+    public void updateFilter(int currentColumnsCount)
     {
-        return originalFilter;
+        if (!countCQL3Rows)
+            return;
+
+        currentLimit = maxResults - currentColumnsCount;
+        // We propagate that limit to the underlying filter so each internal query don't
+        // fetch more than we needs it to.
+        dataRange.updateColumnsLimit(currentLimit);
     }
 
     public abstract List<IndexExpression> getClause();
@@ -138,18 +122,18 @@ public abstract class ExtendedFilter
      * @param data the data retrieve by the initial filter
      * @return a filter or null if there can't be any columns we missed with our initial filter (typically if it was a names query, or a slice of the entire row)
      */
-    public abstract IDiskAtomFilter getExtraFilter(ColumnFamily data);
+    public abstract IDiskAtomFilter getExtraFilter(DecoratedKey key, ColumnFamily data);
 
     /**
      * @return data pruned down to the columns originally asked for
      */
-    public abstract ColumnFamily prune(ColumnFamily data);
+    public abstract ColumnFamily prune(DecoratedKey key, ColumnFamily data);
 
     /**
      * @return true if the provided data satisfies all the expressions from
      * the clause of this filter.
      */
-    public abstract boolean isSatisfiedBy(ByteBuffer rowKey, ColumnFamily data, ColumnNameBuilder builder);
+    public abstract boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder);
 
     public static boolean satisfies(int comparison, IndexOperator op)
     {
@@ -170,44 +154,54 @@ public abstract class ExtendedFilter
         }
     }
 
-    private static class FilterWithClauses extends ExtendedFilter
+    public static class WithClauses extends ExtendedFilter
     {
-        protected final List<IndexExpression> clause;
-        protected final IDiskAtomFilter initialFilter;
-
-        public FilterWithClauses(ColumnFamilyStore cfs,
-                                 List<IndexExpression> clause,
-                                 IDiskAtomFilter filter,
-                                 int maxResults,
-                                 long timestamp,
-                                 boolean countCQL3Rows)
+        private final List<IndexExpression> clause;
+        private final IDiskAtomFilter optimizedFilter;
+
+        public WithClauses(ColumnFamilyStore cfs,
+                           DataRange range,
+                           List<IndexExpression> clause,
+                           int maxResults,
+                           boolean countCQL3Rows,
+                           long timestamp)
         {
-            super(cfs, filter, maxResults, timestamp, countCQL3Rows, false);
+            super(cfs, range, maxResults, countCQL3Rows, timestamp);
             assert clause != null;
             this.clause = clause;
-            this.initialFilter = computeInitialFilter();
+            this.optimizedFilter = computeOptimizedFilter();
         }
 
-        /** Sets up the initial filter. */
-        protected IDiskAtomFilter computeInitialFilter()
+        /*
+         * Potentially optimize the column filter if we have a change to make it catch all clauses
+         * right away.
+         */
+        private IDiskAtomFilter computeOptimizedFilter()
         {
-            if (originalFilter instanceof SliceQueryFilter)
+            /*
+             * We shouldn't do the "optimization" for composites as the index names are not valid column names 
+             * (which the rest of the method assumes). Said optimization is not useful for composites anyway.
+             * We also don't want to do for paging ranges as the actual filter depends on the row key (it would
+             * probably be possible to make it work but we won't really use it so we don't bother).
+             */
+            if (cfs.getComparator() instanceof CompositeType || dataRange instanceof DataRange.Paging)
+                return null;
+
+            IDiskAtomFilter filter = dataRange.columnFilter(null); // ok since not a paging range
+            if (filter instanceof SliceQueryFilter)
             {
                 // if we have a high chance of getting all the columns in a single index slice (and it's not too costly), do that.
                 // otherwise, the extraFilter (lazily created) will fetch by name the columns referenced by the additional expressions.
                 if (cfs.getMaxRowSize() < DatabaseDescriptor.getColumnIndexSize())
                 {
                     logger.trace("Expanding slice filter to entire row to cover additional expressions");
-                    return new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                ((SliceQueryFilter) originalFilter).reversed,
-                                                Integer.MAX_VALUE);
+                    return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, ((SliceQueryFilter)filter).reversed, Integer.MAX_VALUE);
                 }
             }
             else
             {
                 logger.trace("adding columns to original Filter to cover additional expressions");
-                assert originalFilter instanceof NamesQueryFilter;
+                assert filter instanceof NamesQueryFilter;
                 if (!clause.isEmpty())
                 {
                     SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(cfs.getComparator());
@@ -215,16 +209,17 @@ public abstract class ExtendedFilter
                     {
                         columns.add(expr.column_name);
                     }
-                    columns.addAll(((NamesQueryFilter) originalFilter).columns);
-                    return ((NamesQueryFilter)originalFilter).withUpdatedColumns(columns);
+                    columns.addAll(((NamesQueryFilter) filter).columns);
+                    return ((NamesQueryFilter) filter).withUpdatedColumns(columns);
                 }
             }
-            return originalFilter;
+            return null;
         }
 
-        public IDiskAtomFilter initialFilter()
+        @Override
+        public IDiskAtomFilter columnFilter(ByteBuffer key)
         {
-            return initialFilter;
+            return optimizedFilter == null ? dataRange.columnFilter(key) : optimizedFilter;
         }
 
         public List<IndexExpression> getClause()
@@ -233,21 +228,13 @@ public abstract class ExtendedFilter
         }
 
         /*
-         * We may need an extra query only if the original was a slice query (and thus may have miss the expression for the clause).
-         * Even then, there is no point in doing an extra query if the original filter grabbed the whole row.
-         * Lastly, we only need the extra query if we haven't yet got all the expressions from the clause.
+         * We may need an extra query only if the original query wasn't selecting the row entirely.
+         * Furthermore, we only need the extra query if we haven't yet got all the expressions from the clause.
          */
-        private boolean needsExtraQuery(ColumnFamily data)
+        private boolean needsExtraQuery(ByteBuffer rowKey, ColumnFamily data)
         {
-            if (!(originalFilter instanceof SliceQueryFilter))
-                return false;
-
-            SliceQueryFilter filter = (SliceQueryFilter)originalFilter;
-            // Check if we've fetch the whole row
-            if (filter.slices.length == 1
-             && filter.start().equals(ByteBufferUtil.EMPTY_BYTE_BUFFER)
-             && filter.finish().equals(ByteBufferUtil.EMPTY_BYTE_BUFFER)
-             && filter.count == Integer.MAX_VALUE)
+            IDiskAtomFilter filter = columnFilter(rowKey);
+            if (filter instanceof SliceQueryFilter && DataRange.isFullRowSlice((SliceQueryFilter)filter))
                 return false;
 
             for (IndexExpression expr : clause)
@@ -261,9 +248,18 @@ public abstract class ExtendedFilter
             return false;
         }
 
-        public IDiskAtomFilter getExtraFilter(ColumnFamily data)
+        public IDiskAtomFilter getExtraFilter(DecoratedKey rowKey, ColumnFamily data)
         {
-            if (!needsExtraQuery(data))
+            /*
+             * This method assumes the IndexExpression names are valid column names, which is not the
+             * case with composites. This is ok for now however since:
+             * 1) CompositeSearcher doesn't use it.
+             * 2) We don't yet allow non-indexed range slice with filters in CQL3 (i.e. this will never be
+             * called by CFS.filter() for composites).
+             */
+            assert !(cfs.getComparator() instanceof CompositeType);
+
+            if (!needsExtraQuery(rowKey.key, data))
                 return null;
 
             // Note: for counters we must be careful to not add a column that was already there (to avoid overcount). That is
@@ -278,17 +274,19 @@ public abstract class ExtendedFilter
             return new NamesQueryFilter(columns);
         }
 
-        public ColumnFamily prune(ColumnFamily data)
+        public ColumnFamily prune(DecoratedKey rowKey, ColumnFamily data)
         {
-            if (initialFilter == originalFilter)
+            if (optimizedFilter == null)
                 return data;
+
             ColumnFamily pruned = data.cloneMeShallow();
-            OnDiskAtomIterator iter = originalFilter.getMemtableColumnIterator(data, null);
-            originalFilter.collectReducedColumns(pruned, QueryFilter.gatherTombstones(pruned, iter), cfs.gcBefore(timestamp), timestamp);
+            IDiskAtomFilter filter = dataRange.columnFilter(rowKey.key);
+            OnDiskAtomIterator iter = filter.getColumnFamilyIterator(rowKey, data);
+            filter.collectReducedColumns(pruned, QueryFilter.gatherTombstones(pruned, iter), cfs.gcBefore(timestamp), timestamp);
             return pruned;
         }
 
-        public boolean isSatisfiedBy(ByteBuffer rowKey, ColumnFamily data, ColumnNameBuilder builder)
+        public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder)
         {
             // We enforces even the primary clause because reads are not synchronized with writes and it is thus possible to have a race
             // where the index returned a row which doesn't have the primary column when we actually read it
@@ -310,7 +308,7 @@ public abstract class ExtendedFilter
                 }
                 else
                 {
-                    dataValue = extractDataValue(def, rowKey, data, builder);
+                    dataValue = extractDataValue(def, rowKey.key, data, builder);
                     validator = def.getValidator();
                 }
 
@@ -346,68 +344,29 @@ public abstract class ExtendedFilter
         }
     }
 
-    private static class FilterWithCompositeClauses extends FilterWithClauses
-    {
-        public FilterWithCompositeClauses(ColumnFamilyStore cfs,
-                                          List<IndexExpression> clause,
-                                          IDiskAtomFilter filter,
-                                          int maxResults,
-                                          long timestamp,
-                                          boolean countCQL3Rows)
-        {
-            super(cfs, clause, filter, maxResults, timestamp, countCQL3Rows);
-        }
-
-        /*
-         * For composites, the index name is not a valid column name (it's only
-         * one of the component), which means we should not do the
-         * NamesQueryFilter part of FilterWithClauses in particular.
-         * Besides, CompositesSearcher doesn't really use the initial filter
-         * expect to know the limit set by the user, so create a fake filter
-         * with only the count information.
-         */
-        protected IDiskAtomFilter computeInitialFilter()
-        {
-            int limit = originalFilter instanceof SliceQueryFilter
-                      ? ((SliceQueryFilter)originalFilter).count
-                      : Integer.MAX_VALUE;
-            return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, limit);
-        }
-    }
-
     private static class EmptyClauseFilter extends ExtendedFilter
     {
-        public EmptyClauseFilter(ColumnFamilyStore cfs,
-                                 IDiskAtomFilter filter,
-                                 int maxResults,
-                                 long timestamp,
-                                 boolean countCQL3Rows,
-                                 boolean isPaging)
-        {
-            super(cfs, filter, maxResults, timestamp, countCQL3Rows, isPaging);
-        }
-
-        public IDiskAtomFilter initialFilter()
+        public EmptyClauseFilter(ColumnFamilyStore cfs, DataRange range, int maxResults, boolean countCQL3Rows, long timestamp)
         {
-            return originalFilter;
+            super(cfs, range, maxResults, countCQL3Rows, timestamp);
         }
 
         public List<IndexExpression> getClause()
         {
-            throw new UnsupportedOperationException();
+            return Collections.<IndexExpression>emptyList();
         }
 
-        public IDiskAtomFilter getExtraFilter(ColumnFamily data)
+        public IDiskAtomFilter getExtraFilter(DecoratedKey key, ColumnFamily data)
         {
             return null;
         }
 
-        public ColumnFamily prune(ColumnFamily data)
+        public ColumnFamily prune(DecoratedKey rowKey, ColumnFamily data)
         {
             return data;
         }
 
-        public boolean isSatisfiedBy(ByteBuffer rowKey, ColumnFamily data, ColumnNameBuilder builder)
+        public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder)
         {
             return true;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
index 35f71e5..69a8950 100644
--- a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
@@ -41,10 +41,10 @@ import org.apache.cassandra.io.util.FileDataInput;
 public interface IDiskAtomFilter
 {
     /**
-     * returns an iterator that returns columns from the given memtable
+     * returns an iterator that returns columns from the given columnFamily
      * matching the Filter criteria in sorted order.
      */
-    public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key);
+    public OnDiskAtomIterator getColumnFamilyIterator(DecoratedKey key, ColumnFamily cf);
 
     /**
      * Get an iterator that returns columns from the given SSTable using the opened file
@@ -74,6 +74,7 @@ public interface IDiskAtomFilter
     public void updateColumnsLimit(int newLimit);
 
     public int getLiveCount(ColumnFamily cf, long now);
+    public ColumnCounter columnCounter(AbstractType<?> comparator, long now);
 
     public IDiskAtomFilter cloneShallow();
     public boolean maySelectPrefix(Comparator<ByteBuffer> cmp, ByteBuffer prefix);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
index 297f227..d3f057d 100644
--- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
@@ -27,6 +27,7 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 
 import org.apache.commons.lang.StringUtils;
+import com.google.common.collect.AbstractIterator;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
@@ -75,9 +76,10 @@ public class NamesQueryFilter implements IDiskAtomFilter
        return new NamesQueryFilter(newColumns, countCQL3Rows);
     }
 
-    public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
+    public OnDiskAtomIterator getColumnFamilyIterator(DecoratedKey key, ColumnFamily cf)
     {
-        return Memtable.getNamesIterator(key, cf, this);
+        assert cf != null;
+        return new ByNameColumnIterator(columns.iterator(), cf, key);
     }
 
     public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key)
@@ -120,6 +122,8 @@ public class NamesQueryFilter implements IDiskAtomFilter
 
     public int getLiveCount(ColumnFamily cf, long now)
     {
+        // Note: we could use columnCounter() but we save the object allocation as it's simple enough
+
         if (countCQL3Rows)
             return cf.hasOnlyTombstones(now) ? 0 : 1;
 
@@ -147,6 +151,56 @@ public class NamesQueryFilter implements IDiskAtomFilter
         return true;
     }
 
+    public boolean countCQL3Rows()
+    {
+        return countCQL3Rows;
+    }
+
+    public ColumnCounter columnCounter(AbstractType<?> comparator, long now)
+    {
+        return countCQL3Rows
+             ? new ColumnCounter.GroupByPrefix(now, null, 0)
+             : new ColumnCounter(now);
+    }
+
+    private static class ByNameColumnIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
+    {
+        private final ColumnFamily cf;
+        private final DecoratedKey key;
+        private final Iterator<ByteBuffer> iter;
+
+        public ByNameColumnIterator(Iterator<ByteBuffer> iter, ColumnFamily cf, DecoratedKey key)
+        {
+            this.iter = iter;
+            this.cf = cf;
+            this.key = key;
+        }
+
+        public ColumnFamily getColumnFamily()
+        {
+            return cf;
+        }
+
+        public DecoratedKey getKey()
+        {
+            return key;
+        }
+
+        protected OnDiskAtom computeNext()
+        {
+            while (iter.hasNext())
+            {
+                ByteBuffer current = iter.next();
+                Column column = cf.getColumn(current);
+                if (column != null)
+                    return column;
+            }
+            return endOfData();
+        }
+
+        public void close() throws IOException { }
+    }
+
     public static class Serializer implements IVersionedSerializer<NamesQueryFilter>
     {
         public void serialize(NamesQueryFilter f, DataOutput out, int version) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/QueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/QueryFilter.java b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
index ac0c632..4f71f3a 100644
--- a/src/java/org/apache/cassandra/db/filter/QueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
@@ -48,13 +48,13 @@ public class QueryFilter
         ColumnFamily cf = memtable.getColumnFamily(key);
         if (cf == null)
             return null;
-        return getMemtableColumnIterator(cf, key);
+        return getColumnFamilyIterator(cf);
     }
 
-    public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
+    public OnDiskAtomIterator getColumnFamilyIterator(ColumnFamily cf)
     {
         assert cf != null;
-        return filter.getMemtableColumnIterator(cf, key);
+        return filter.getColumnFamilyIterator(key, cf);
     }
 
     public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable)
@@ -69,10 +69,15 @@ public class QueryFilter
 
     public void collateOnDiskAtom(final ColumnFamily returnCF, List<? extends Iterator<? extends OnDiskAtom>> toCollate, final int gcBefore)
     {
+        collateOnDiskAtom(returnCF, toCollate, filter, gcBefore, timestamp);
+    }
+
+    public static void collateOnDiskAtom(final ColumnFamily returnCF, List<? extends Iterator<? extends OnDiskAtom>> toCollate, IDiskAtomFilter filter, int gcBefore, long timestamp)
+    {
         List<Iterator<Column>> filteredIterators = new ArrayList<Iterator<Column>>(toCollate.size());
         for (Iterator<? extends OnDiskAtom> iter : toCollate)
             filteredIterators.add(gatherTombstones(returnCF, iter));
-        collateColumns(returnCF, filteredIterators, gcBefore);
+        collateColumns(returnCF, filteredIterators, filter, gcBefore, timestamp);
     }
 
     /**
@@ -84,7 +89,12 @@ public class QueryFilter
         filter.collectReducedColumns(returnCF, columns, gcBefore, timestamp);
     }
 
-    public void collateColumns(final ColumnFamily returnCF, List<? extends Iterator<Column>> toCollate, final int gcBefore)
+    public void collateColumns(final ColumnFamily returnCF, List<? extends Iterator<Column>> toCollate, int gcBefore)
+    {
+        collateColumns(returnCF, toCollate, filter, gcBefore, timestamp);
+    }
+
+    public static void collateColumns(final ColumnFamily returnCF, List<? extends Iterator<Column>> toCollate, IDiskAtomFilter filter, int gcBefore, long timestamp)
     {
         final Comparator<Column> fcomp = filter.getColumnComparator(returnCF.getComparator());
         // define a 'reduced' iterator that merges columns w/ the same name, which

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index b76ce04..9cbc49e 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -22,6 +22,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.Iterator;
@@ -95,14 +96,74 @@ public class SliceQueryFilter implements IDiskAtomFilter
         return new SliceQueryFilter(newSlices, reversed, count, compositesToGroup);
     }
 
+    public SliceQueryFilter withUpdatedStart(ByteBuffer newStart, AbstractType<?> comparator)
+    {
+        Comparator<ByteBuffer> cmp = reversed ? comparator.reverseComparator : comparator;
+
+        List<ColumnSlice> newSlices = new ArrayList<ColumnSlice>();
+        boolean pastNewStart = false;
+        for (int i = 0; i < slices.length; i++)
+        {
+            ColumnSlice slice = slices[i];
+
+            if (pastNewStart)
+            {
+                newSlices.add(slice);
+                continue;
+            }
+
+            if (slices[i].isBefore(cmp, newStart))
+                continue;
+
+            if (slice.includes(cmp, newStart))
+                newSlices.add(new ColumnSlice(newStart, slice.finish));
+            else
+                newSlices.add(slice);
+
+            pastNewStart = true;
+        }
+        return withUpdatedSlices(newSlices.toArray(new ColumnSlice[newSlices.size()]));
+    }
+
     public SliceQueryFilter withUpdatedSlice(ByteBuffer start, ByteBuffer finish)
     {
         return new SliceQueryFilter(new ColumnSlice[]{ new ColumnSlice(start, finish) }, reversed, count, compositesToGroup);
     }
 
-    public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
+    public OnDiskAtomIterator getColumnFamilyIterator(final DecoratedKey key, final ColumnFamily cf)
     {
-        return Memtable.getSliceIterator(key, cf, this);
+        assert cf != null;
+        final Iterator<Column> filteredIter = reversed ? cf.reverseIterator(slices) : cf.iterator(slices);
+
+        return new OnDiskAtomIterator()
+        {
+            public ColumnFamily getColumnFamily()
+            {
+                return cf;
+            }
+
+            public DecoratedKey getKey()
+            {
+                return key;
+            }
+
+            public boolean hasNext()
+            {
+                return filteredIter.hasNext();
+            }
+
+            public OnDiskAtom next()
+            {
+                return filteredIter.next();
+            }
+
+            public void close() throws IOException { }
+
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+        };
     }
 
     public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key)
@@ -122,7 +183,7 @@ public class SliceQueryFilter implements IDiskAtomFilter
 
     public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore, long now)
     {
-        columnCounter = getColumnCounter(container, now);
+        columnCounter = columnCounter(container.getComparator(), now);
 
         while (reducedColumns.hasNext())
         {
@@ -144,15 +205,11 @@ public class SliceQueryFilter implements IDiskAtomFilter
 
     public int getLiveCount(ColumnFamily cf, long now)
     {
-        ColumnCounter counter = getColumnCounter(cf, now);
-        for (Column column : cf)
-            counter.count(column, cf);
-        return counter.live();
+        return columnCounter(cf.getComparator(), now).countAll(cf).live();
     }
 
-    private ColumnCounter getColumnCounter(ColumnFamily container, long now)
+    public ColumnCounter columnCounter(AbstractType<?> comparator, long now)
     {
-        AbstractType<?> comparator = container.getComparator();
         if (compositesToGroup < 0)
             return new ColumnCounter(now);
         else if (compositesToGroup == 0)
@@ -163,7 +220,7 @@ public class SliceQueryFilter implements IDiskAtomFilter
 
     public void trim(ColumnFamily cf, int trimTo, long now)
     {
-        ColumnCounter counter = getColumnCounter(cf, now);
+        ColumnCounter counter = columnCounter(cf.getComparator(), now);
 
         Collection<Column> columns = reversed
                                    ? cf.getReverseSortedColumns()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 17ac81f..a40f4bd 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.filter.ExtendedFilter;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -514,14 +515,9 @@ public class SecondaryIndexManager
      * @param columnFilter the column range to restrict to
      * @return found indexed rows
      */
-    public List<Row> search(AbstractBounds<RowPosition> range,
-                            List<IndexExpression> clause,
-                            IDiskAtomFilter columnFilter,
-                            int maxResults,
-                            long now,
-                            boolean countCQL3Rows)
+    public List<Row> search(ExtendedFilter filter)
     {
-        List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersForQuery(clause);
+        List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersForQuery(filter.getClause());
 
         if (indexSearchers.isEmpty())
             return Collections.emptyList();
@@ -530,7 +526,7 @@ public class SecondaryIndexManager
         if (indexSearchers.size() > 1)
             throw new RuntimeException("Unable to search across multiple secondary index types");
 
-        return indexSearchers.get(0).search(range, clause, columnFilter, maxResults, now, countCQL3Rows);
+        return indexSearchers.get(0).search(filter);
     }
 
     public Collection<SecondaryIndex> getIndexesByNames(Set<String> idxNames)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index ddd79dd..d28afc0 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -21,8 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.db.filter.ExtendedFilter;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
 
@@ -39,12 +38,7 @@ public abstract class SecondaryIndexSearcher
         this.baseCfs = indexManager.baseCfs;
     }
 
-    public abstract List<Row> search(AbstractBounds<RowPosition> range,
-                                     List<IndexExpression> clause,
-                                     IDiskAtomFilter dataFilter,
-                                     int maxResults,
-                                     long now,
-                                     boolean countCQL3Rows);
+    public abstract List<Row> search(ExtendedFilter filter);
 
     /**
      * @return true this index is able to handle given clauses.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index e8c0a09..f9b7b11 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -44,16 +44,10 @@ public class CompositesSearcher extends SecondaryIndexSearcher
     }
 
     @Override
-    public List<Row> search(AbstractBounds<RowPosition> range,
-                            List<IndexExpression> clause,
-                            IDiskAtomFilter dataFilter,
-                            int maxResults,
-                            long now,
-                            boolean countCQL3Rows)
+    public List<Row> search(ExtendedFilter filter)
     {
-        assert clause != null && !clause.isEmpty();
-        ExtendedFilter filter = ExtendedFilter.create(baseCfs, clause, dataFilter, maxResults, now, countCQL3Rows, false);
-        return baseCfs.filter(getIndexedIterator(range, filter), filter);
+        assert filter.getClause() != null && !filter.getClause().isEmpty();
+        return baseCfs.filter(getIndexedIterator(filter), filter);
     }
 
     private ByteBuffer makePrefix(CompositesIndex index, ByteBuffer key, ExtendedFilter filter, boolean isStart)
@@ -62,10 +56,11 @@ public class CompositesSearcher extends SecondaryIndexSearcher
             return ByteBufferUtil.EMPTY_BYTE_BUFFER;
 
         ColumnNameBuilder builder;
-        if (filter.originalFilter() instanceof SliceQueryFilter)
+        IDiskAtomFilter columnFilter = filter.columnFilter(key);
+        if (columnFilter instanceof SliceQueryFilter)
         {
-            SliceQueryFilter originalFilter = (SliceQueryFilter)filter.originalFilter();
-            builder = index.makeIndexColumnNameBuilder(key, isStart ? originalFilter.start() : originalFilter.finish());
+            SliceQueryFilter sqf = (SliceQueryFilter)columnFilter;
+            builder = index.makeIndexColumnNameBuilder(key, isStart ? sqf.start() : sqf.finish());
         }
         else
         {
@@ -74,7 +69,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
         return isStart ? builder.build() : builder.buildAsEndOfRange();
     }
 
-    private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
+    private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final ExtendedFilter filter)
     {
         // Start with the most-restrictive indexed clause, then apply remaining clauses
         // to each row matching that clause.
@@ -93,6 +88,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
          * possible key having a given token. A fix would be to actually store the token along the key in the
          * indexed row.
          */
+        final AbstractBounds<RowPosition> range = filter.dataRange.keyRange();
         ByteBuffer startKey = range.left instanceof DecoratedKey ? ((DecoratedKey)range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
         ByteBuffer endKey = range.right instanceof DecoratedKey ? ((DecoratedKey)range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
 
@@ -140,12 +136,12 @@ public class CompositesSearcher extends SecondaryIndexSearcher
                 DecoratedKey currentKey = null;
                 ColumnFamily data = null;
                 int columnsCount = 0;
-                int limit = ((SliceQueryFilter)filter.initialFilter()).count;
+                int limit = filter.currentLimit();
 
                 while (true)
                 {
                     // Did we got more columns that needed to respect the user limit?
-                    // (but we still need to return was fetch already)
+                    // (but we still need to return what was fetch already)
                     if (columnsCount > limit)
                         return makeReturn(currentKey, data);
 
@@ -235,7 +231,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
 
                         // Check if this entry cannot be a hit due to the original column filter
                         ByteBuffer start = entry.indexedEntryStart();
-                        if (!filter.originalFilter().maySelectPrefix(baseComparator, start))
+                        if (!filter.columnFilter(dk.key).maySelectPrefix(baseComparator, start))
                             continue;
 
                         logger.trace("Adding index hit to current row for {}", indexComparator.getString(column.name()));
@@ -256,7 +252,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
 
                         assert newData != null : "An entry with not data should have been considered stale";
 
-                        if (!filter.isSatisfiedBy(dk.key, newData, entry.indexedEntryNameBuilder))
+                        if (!filter.isSatisfiedBy(dk, newData, entry.indexedEntryNameBuilder))
                             continue;
 
                         if (data == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index e919d8a..205efb7 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -47,20 +47,15 @@ public class KeysSearcher extends SecondaryIndexSearcher
     }
 
     @Override
-    public List<Row> search(AbstractBounds<RowPosition> range,
-                            List<IndexExpression> clause,
-                            IDiskAtomFilter dataFilter,
-                            int maxResults,
-                            long now,
-                            boolean countCQL3Rows)
+    public List<Row> search(ExtendedFilter filter)
     {
-        assert clause != null && !clause.isEmpty();
-        ExtendedFilter filter = ExtendedFilter.create(baseCfs, clause, dataFilter, maxResults, now, countCQL3Rows, false);
-        return baseCfs.filter(getIndexedIterator(range, filter), filter);
+        assert filter.getClause() != null && !filter.getClause().isEmpty();
+        return baseCfs.filter(getIndexedIterator(filter), filter);
     }
 
-    private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
+    private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final ExtendedFilter filter)
     {
+
         // Start with the most-restrictive indexed clause, then apply remaining clauses
         // to each row matching that clause.
         // TODO: allow merge join instead of just one index + loop
@@ -79,6 +74,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
          * possible key having a given token. A fix would be to actually store the token along the key in the
          * indexed row.
          */
+        final AbstractBounds<RowPosition> range = filter.dataRange.keyRange();
         final ByteBuffer startKey = range.left instanceof DecoratedKey ? ((DecoratedKey)range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
         final ByteBuffer endKey = range.right instanceof DecoratedKey ? ((DecoratedKey)range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
 
@@ -165,14 +161,14 @@ public class KeysSearcher extends SecondaryIndexSearcher
                         }
 
                         logger.trace("Returning index hit for {}", dk);
-                        ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.initialFilter(), filter.timestamp));
+                        ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.columnFilter(lastSeenKey), filter.timestamp));
                         // While the column family we'll get in the end should contains the primary clause column, the initialFilter may not have found it and can thus be null
                         if (data == null)
                             data = TreeMapBackedSortedColumns.factory.create(baseCfs.metadata);
 
                         // as in CFS.filter - extend the filter to ensure we include the columns
                         // from the index expressions, just in case they weren't included in the initialFilter
-                        IDiskAtomFilter extraFilter = filter.getExtraFilter(data);
+                        IDiskAtomFilter extraFilter = filter.getExtraFilter(dk, data);
                         if (extraFilter != null)
                         {
                             ColumnFamily cf = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, extraFilter, filter.timestamp));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 85f2677..6c10d95 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.DataRange;
 import org.apache.cassandra.db.DataTracker;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
@@ -994,17 +995,12 @@ public class SSTableReader extends SSTable
 
     /**
      *
-     * @param filter filter to use when reading the columns
+     * @param dataRange filter to use when reading the columns
      * @return A Scanner for seeking over the rows of the SSTable.
      */
-    public SSTableScanner getScanner(QueryFilter filter)
+    public SSTableScanner getScanner(DataRange dataRange)
     {
-        return new SSTableScanner(this, filter, null);
-    }
-
-    public SSTableScanner getScanner(QueryFilter filter, RowPosition startWith)
-    {
-        return new SSTableScanner(this, filter, startWith, null);
+        return new SSTableScanner(this, dataRange, null);
     }
 
     /**
@@ -1018,7 +1014,7 @@ public class SSTableReader extends SSTable
 
     public SSTableScanner getScanner(RateLimiter limiter)
     {
-        return new SSTableScanner(this, null, limiter);
+        return new SSTableScanner(this, DataRange.allData(partitioner), limiter);
     }
 
     /**
@@ -1034,7 +1030,7 @@ public class SSTableReader extends SSTable
 
         Iterator<Pair<Long, Long>> rangeIterator = getPositionsForRanges(Collections.singletonList(range)).iterator();
         if (rangeIterator.hasNext())
-            return new SSTableScanner(this, null, range, limiter);
+            return new SSTableScanner(this, DataRange.forKeyRange(range), limiter);
         else
             return new EmptyCompactionScanner(getFilename());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index fb52a02..66e7189 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -21,9 +21,9 @@ import java.io.IOException;
 import java.util.Iterator;
 
 import com.google.common.util.concurrent.RateLimiter;
-
 import com.google.common.collect.AbstractIterator;
 
+import org.apache.cassandra.db.DataRange;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.RowPosition;
@@ -32,6 +32,7 @@ import org.apache.cassandra.db.columniterator.LazyColumnIterator;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.compaction.ICompactionScanner;
 import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.util.FileUtils;
@@ -43,43 +44,46 @@ public class SSTableScanner implements ICompactionScanner
     protected final RandomAccessReader dfile;
     protected final RandomAccessReader ifile;
     public final SSTableReader sstable;
+    private final DataRange dataRange;
+    private final long stopAt;
+
     protected Iterator<OnDiskAtomIterator> iterator;
-    private final QueryFilter filter;
-    private long stopAt;
 
     /**
      * @param sstable SSTable to scan; must not be null
-     * @param filter filter to use when scanning the columns; may be null
+     * @param filter range of data to fetch; must not be null
      * @param limiter background i/o RateLimiter; may be null
      */
-    SSTableScanner(SSTableReader sstable, QueryFilter filter, RateLimiter limiter)
+    SSTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
     {
         assert sstable != null;
 
         this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
         this.ifile = sstable.openIndexReader();
         this.sstable = sstable;
-        this.filter = filter;
-        stopAt = dfile.length();
+        this.dataRange = dataRange;
+        this.stopAt = computeStopAt();
+        seekToStart();
     }
 
-    public SSTableScanner(SSTableReader sstable, QueryFilter filter, RowPosition startWith, RateLimiter limiter)
+    private void seekToStart()
     {
-        this(sstable, filter, limiter);
+        if (dataRange.startKey().isMinimum(sstable.partitioner))
+            return;
 
-        long indexPosition = sstable.getIndexScanPosition(startWith);
+        long indexPosition = sstable.getIndexScanPosition(dataRange.startKey());
         // -1 means the key is before everything in the sstable. So just start from the beginning.
         if (indexPosition == -1)
-            indexPosition = 0;
-        ifile.seek(indexPosition);
+            return;
 
+        ifile.seek(indexPosition);
         try
         {
             while (!ifile.isEOF())
             {
                 indexPosition = ifile.getFilePointer();
                 DecoratedKey indexDecoratedKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
-                int comparison = indexDecoratedKey.compareTo(startWith);
+                int comparison = indexDecoratedKey.compareTo(dataRange.startKey());
                 if (comparison >= 0)
                 {
                     // Found, just read the dataPosition and seek into index and data files
@@ -102,19 +106,14 @@ public class SSTableScanner implements ICompactionScanner
 
     }
 
-    public SSTableScanner(SSTableReader sstable, QueryFilter filter, Range<Token> range, RateLimiter limiter)
+    private long computeStopAt()
     {
-        this(sstable, filter, range.toRowBounds().left, limiter);
+        AbstractBounds<RowPosition> keyRange = dataRange.keyRange();
+        if (dataRange.stopKey().isMinimum(sstable.partitioner) || (keyRange instanceof Range && ((Range)keyRange).isWrapAround()))
+            return dfile.length();
 
-        if (range.isWrapAround())
-        {
-            stopAt = dfile.length();
-        }
-        else
-        {
-            RowIndexEntry position = sstable.getPosition(range.toRowBounds().right, SSTableReader.Operator.GT);
-            stopAt = position == null ? dfile.length() : position.position;
-        }
+        RowIndexEntry position = sstable.getPosition(keyRange.toRowBounds().right, SSTableReader.Operator.GT);
+        return position == null ? dfile.length() : position.position;
     }
 
     public void close() throws IOException
@@ -202,8 +201,7 @@ public class SSTableScanner implements ICompactionScanner
                 }
 
                 assert !dfile.isEOF();
-
-                if (filter == null)
+                if (dataRange.selectsFullRowFor(currentKey.key))
                 {
                     dfile.seek(currentEntry.position);
                     ByteBufferUtil.readWithShortLength(dfile); // key
@@ -217,7 +215,7 @@ public class SSTableScanner implements ICompactionScanner
                 {
                     public OnDiskAtomIterator create()
                     {
-                        return filter.getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry);
+                        return dataRange.columnFilter(currentKey.key).getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry);
                     }
                 });
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 43e2cb2..4ac408e 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -116,6 +116,7 @@ public final class MessagingService implements MessagingServiceMBean
         PAXOS_PREPARE,
         PAXOS_PROPOSE,
         PAXOS_COMMIT,
+        PAGED_RANGE,
         // remember to add new verbs at the end, since we serialize by ordinal
         UNUSED_1,
         UNUSED_2,
@@ -136,6 +137,7 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.READ, Stage.READ);
         put(Verb.RANGE_SLICE, Stage.READ);
         put(Verb.INDEX_SCAN, Stage.READ);
+        put(Verb.PAGED_RANGE, Stage.READ);
 
         put(Verb.REQUEST_RESPONSE, Stage.REQUEST_RESPONSE);
         put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
@@ -188,6 +190,7 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.READ_REPAIR, RowMutation.serializer);
         put(Verb.READ, ReadCommand.serializer);
         put(Verb.RANGE_SLICE, RangeSliceCommand.serializer);
+        put(Verb.PAGED_RANGE, RangeSliceCommand.serializer);
         put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
         put(Verb.TREE_REQUEST, ActiveRepairService.TreeRequest.serializer);
         put(Verb.TREE_RESPONSE, ActiveRepairService.Validator.serializer);
@@ -216,6 +219,7 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.READ_REPAIR, WriteResponse.serializer);
         put(Verb.COUNTER_MUTATION, WriteResponse.serializer);
         put(Verb.RANGE_SLICE, RangeSliceReply.serializer);
+        put(Verb.PAGED_RANGE, RangeSliceReply.serializer);
         put(Verb.READ, ReadResponse.serializer);
         put(Verb.TRUNCATE, TruncateResponse.serializer);
         put(Verb.SNAPSHOT, null);
@@ -293,6 +297,7 @@ public final class MessagingService implements MessagingServiceMBean
                                                                    Verb.READ_REPAIR,
                                                                    Verb.READ,
                                                                    Verb.RANGE_SLICE,
+                                                                   Verb.PAGED_RANGE,
                                                                    Verb.REQUEST_RESPONSE);
 
     // total dropped message counts for server lifetime

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/QueryState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/QueryState.java b/src/java/org/apache/cassandra/service/QueryState.java
index 6483d9b..38d103d 100644
--- a/src/java/org/apache/cassandra/service/QueryState.java
+++ b/src/java/org/apache/cassandra/service/QueryState.java
@@ -17,9 +17,17 @@
  */
 package org.apache.cassandra.service;
 
+import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.UUID;
 
+import org.apache.cassandra.cql3.statements.SelectStatement;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.pager.QueryPager;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -30,6 +38,7 @@ public class QueryState
     private final ClientState clientState;
     private volatile long clock;
     private volatile UUID preparedTracingSession;
+    private volatile Pager pager;
 
     public QueryState(ClientState clientState)
     {
@@ -68,6 +77,13 @@ public class QueryState
         this.preparedTracingSession = sessionId;
     }
 
+    public UUID getAndResetCurrentTracingSession()
+    {
+        UUID previous = preparedTracingSession;
+        preparedTracingSession = null;
+        return previous;
+    }
+
     public void createTracingSession()
     {
         if (this.preparedTracingSession == null)
@@ -77,9 +93,53 @@ public class QueryState
         else
         {
             UUID session = this.preparedTracingSession;
-            this.preparedTracingSession = null;
             Tracing.instance.newSession(session);
         }
     }
-}
 
+    public void attachPager(QueryPager queryPager, SelectStatement statement, List<ByteBuffer> variables)
+    {
+        pager = new Pager(queryPager, statement, variables);
+    }
+
+    public boolean hasPager()
+    {
+        return pager != null;
+    }
+
+    public void dropPager()
+    {
+        pager = null;
+    }
+
+    public ResultMessage.Rows getNextPage(int pageSize) throws RequestValidationException, RequestExecutionException
+    {
+        assert pager != null; // We've already validated (in ServerConnection) that this should not be null
+
+        int currentLimit = pager.queryPager.maxRemaining();
+        List<Row> page = pager.queryPager.fetchPage(pageSize);
+        ResultMessage.Rows msg = pager.statement.processResults(page, pager.variables, currentLimit, pager.queryPager.timestamp());
+
+        if (pager.queryPager.isExhausted())
+            dropPager();
+        else
+            msg.result.metadata.setHasMorePages();
+
+        return msg;
+    }
+
+    // Groups the actual query pager with the Select Query
+    private static class Pager
+    {
+        private final QueryPager queryPager;
+        private final SelectStatement statement;
+        private final List<ByteBuffer> variables;
+
+        private Pager(QueryPager queryPager, SelectStatement statement, List<ByteBuffer> variables)
+        {
+            this.queryPager = queryPager;
+            this.statement = statement;
+            this.variables = variables;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
index f63fcb1..9a7d1d2 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
@@ -19,39 +19,16 @@ package org.apache.cassandra.service;
 
 import java.util.List;
 
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.RangeSliceCommand;
+import org.apache.cassandra.db.AbstractRangeCommand;
 import org.apache.cassandra.db.RangeSliceReply;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.db.Table;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.tracing.Tracing;
 
-public class RangeSliceVerbHandler implements IVerbHandler<RangeSliceCommand>
+public class RangeSliceVerbHandler implements IVerbHandler<AbstractRangeCommand>
 {
-    public static List<Row> executeLocally(RangeSliceCommand command)
-    {
-        ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
-        if (cfs.indexManager.hasIndexFor(command.row_filter))
-            return cfs.search(command.range,
-                              command.row_filter,
-                              command.predicate,
-                              command.maxResults,
-                              command.timestamp,
-                              command.countCQL3Rows);
-        else
-            return cfs.getRangeSlice(command.range,
-                                     command.row_filter,
-                                     command.predicate,
-                                     command.maxResults,
-                                     command.timestamp,
-                                     command.countCQL3Rows,
-                                     command.isPaging);
-    }
-
-    public void doVerb(MessageIn<RangeSliceCommand> message, int id)
+    public void doVerb(MessageIn<AbstractRangeCommand> message, int id)
     {
         try
         {
@@ -60,7 +37,7 @@ public class RangeSliceVerbHandler implements IVerbHandler<RangeSliceCommand>
                 /* Don't service reads! */
                 throw new RuntimeException("Cannot service reads while bootstrapping!");
             }
-            RangeSliceReply reply = new RangeSliceReply(executeLocally(message.payload));
+            RangeSliceReply reply = new RangeSliceReply(message.payload.executeLocally());
             Tracing.trace("Enqueuing response to {}", message.from);
             MessagingService.instance().sendReply(reply.createMessage(), id, message.from);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 612f89b..0286bd3 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1294,11 +1294,11 @@ public class StorageProxy implements StorageProxyMBean
 
     static class LocalRangeSliceRunnable extends DroppableRunnable
     {
-        private final RangeSliceCommand command;
+        private final AbstractRangeCommand command;
         private final ReadCallback<RangeSliceReply, Iterable<Row>> handler;
         private final long start = System.nanoTime();
 
-        LocalRangeSliceRunnable(RangeSliceCommand command, ReadCallback<RangeSliceReply, Iterable<Row>> handler)
+        LocalRangeSliceRunnable(AbstractRangeCommand command, ReadCallback<RangeSliceReply, Iterable<Row>> handler)
         {
             super(MessagingService.Verb.READ);
             this.command = command;
@@ -1307,7 +1307,7 @@ public class StorageProxy implements StorageProxyMBean
 
         protected void runMayThrow()
         {
-            RangeSliceReply result = new RangeSliceReply(RangeSliceVerbHandler.executeLocally(command));
+            RangeSliceReply result = new RangeSliceReply(command.executeLocally());
             MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
             handler.response(result);
         }
@@ -1337,7 +1337,7 @@ public class StorageProxy implements StorageProxyMBean
         return inter;
     }
 
-    public static List<Row> getRangeSlice(RangeSliceCommand command, ConsistencyLevel consistency_level)
+    public static List<Row> getRangeSlice(AbstractRangeCommand command, ConsistencyLevel consistency_level)
     throws UnavailableException, ReadTimeoutException
     {
         Tracing.trace("Determining replicas to query");
@@ -1348,11 +1348,9 @@ public class StorageProxy implements StorageProxyMBean
         // now scan until we have enough results
         try
         {
-            IDiskAtomFilter commandPredicate = command.predicate;
-
             int cql3RowCount = 0;
             rows = new ArrayList<Row>();
-            List<AbstractBounds<RowPosition>> ranges = getRestrictedRanges(command.range);
+            List<AbstractBounds<RowPosition>> ranges = getRestrictedRanges(command.keyRange);
             int i = 0;
             AbstractBounds<RowPosition> nextRange = null;
             List<InetAddress> nextEndpoints = null;
@@ -1408,15 +1406,7 @@ public class StorageProxy implements StorageProxyMBean
                     ++i;
                 }
 
-                RangeSliceCommand nodeCmd = new RangeSliceCommand(command.keyspace,
-                                                                  command.column_family,
-                                                                  command.timestamp,
-                                                                  commandPredicate,
-                                                                  range,
-                                                                  command.row_filter,
-                                                                  command.maxResults,
-                                                                  command.countCQL3Rows,
-                                                                  command.isPaging);
+                AbstractRangeCommand nodeCmd = command.forSubRange(range);
 
                 // collect replies and resolve according to consistency level
                 RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp);
@@ -1431,7 +1421,7 @@ public class StorageProxy implements StorageProxyMBean
                 }
                 else
                 {
-                    MessageOut<RangeSliceCommand> message = nodeCmd.createMessage();
+                    MessageOut<? extends AbstractRangeCommand> message = nodeCmd.createMessage();
                     for (InetAddress endpoint : filteredEndpoints)
                     {
                         Tracing.trace("Enqueuing request to {}", endpoint);
@@ -1444,8 +1434,8 @@ public class StorageProxy implements StorageProxyMBean
                     for (Row row : handler.get())
                     {
                         rows.add(row);
-                        if (nodeCmd.countCQL3Rows)
-                            cql3RowCount += row.getLiveCount(commandPredicate, command.timestamp);
+                        if (nodeCmd.countCQL3Rows())
+                            cql3RowCount += row.getLiveCount(command.predicate, command.timestamp);
                     }
                     FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
                 }
@@ -1462,18 +1452,9 @@ public class StorageProxy implements StorageProxyMBean
                 }
 
                 // if we're done, great, otherwise, move to the next range
-                int count = nodeCmd.countCQL3Rows ? cql3RowCount : rows.size();
-                if (count >= nodeCmd.maxResults)
+                int count = nodeCmd.countCQL3Rows() ? cql3RowCount : rows.size();
+                if (count >= nodeCmd.limit())
                     break;
-
-                // if we are paging and already got some rows, reset the column filter predicate,
-                // so we start iterating the next row from the first column
-                if (!rows.isEmpty() && command.isPaging)
-                {
-                    // We only allow paging with a slice filter (doesn't make sense otherwise anyway)
-                    assert commandPredicate instanceof SliceQueryFilter;
-                    commandPredicate = ((SliceQueryFilter)commandPredicate).withUpdatedSlices(ColumnSlice.ALL_COLUMNS_ARRAY);
-                }
             }
         }
         finally
@@ -1483,13 +1464,13 @@ public class StorageProxy implements StorageProxyMBean
         return trim(command, rows);
     }
 
-    private static List<Row> trim(RangeSliceCommand command, List<Row> rows)
+    private static List<Row> trim(AbstractRangeCommand command, List<Row> rows)
     {
-        // When countCQL3Rows, we let the caller trim the result.
-        if (command.countCQL3Rows)
+        // When maxIsColumns, we let the caller trim the result.
+        if (command.countCQL3Rows())
             return rows;
         else
-            return rows.size() > command.maxResults ? rows.subList(0, command.maxResults) : rows;
+            return rows.size() > command.limit() ? rows.subList(0, command.limit()) : rows;
     }
 
     /**