You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/06/25 10:32:39 UTC

[1/4] Add auto paging capability to the native protocol

Updated Branches:
  refs/heads/trunk 40bc4456f -> e48ff2938


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index e334b02..860c404 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+
 import com.google.common.collect.ImmutableMap;
 import org.jboss.netty.buffer.ChannelBuffer;
 
@@ -44,6 +45,7 @@ public class QueryMessage extends Message.Request
         {
             String query = CBUtil.readLongString(body);
             ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
+            int resultPageSize = body.readInt();
             List<ByteBuffer> values;
             if (body.readable())
             {
@@ -56,10 +58,10 @@ public class QueryMessage extends Message.Request
             {
                 values = Collections.emptyList();
             }
-            return new QueryMessage(query, values, consistency);
+            return new QueryMessage(query, values, consistency, resultPageSize);
         }
 
-        public ChannelBuffer encode(QueryMessage msg)
+        public ChannelBuffer encode(QueryMessage msg, int version)
         {
             // We have:
             //   - query
@@ -68,11 +70,13 @@ public class QueryMessage extends Message.Request
             //   - Number of values
             //   - The values
             int vs = msg.values.size();
-            CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(3, 0, vs);
+            CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(3 + (vs > 0 ? 1 : 0), 0, vs);
             builder.add(CBUtil.longStringToCB(msg.query));
             builder.add(CBUtil.consistencyLevelToCB(msg.consistency));
-            if (vs > 0 && msg.getVersion() > 1)
+            builder.add(CBUtil.intToCB(msg.resultPageSize));
+            if (vs > 0)
             {
+                assert version > 1 : "Version 1 of the protocol do not allow values";
                 builder.add(CBUtil.shortToCB(vs));
                 for (ByteBuffer value : msg.values)
                     builder.addValue(value);
@@ -83,30 +87,35 @@ public class QueryMessage extends Message.Request
 
     public final String query;
     public final ConsistencyLevel consistency;
+    public final int resultPageSize;
     public final List<ByteBuffer> values;
 
     public QueryMessage(String query, ConsistencyLevel consistency)
     {
-        this(query, Collections.<ByteBuffer>emptyList(), consistency);
+        this(query, Collections.<ByteBuffer>emptyList(), consistency, -1);
     }
 
-    public QueryMessage(String query, List<ByteBuffer> values, ConsistencyLevel consistency)
+    public QueryMessage(String query, List<ByteBuffer> values, ConsistencyLevel consistency, int resultPageSize)
     {
         super(Type.QUERY);
         this.query = query;
+        this.resultPageSize = resultPageSize;
         this.consistency = consistency;
         this.values = values;
     }
 
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     public Message.Response execute(QueryState state)
     {
         try
         {
+            if (resultPageSize == 0)
+                throw new ProtocolException("The page size cannot be 0");
+
             UUID tracingId = null;
             if (isTracingRequested())
             {
@@ -117,10 +126,16 @@ public class QueryMessage extends Message.Request
             if (state.traceNextQuery())
             {
                 state.createTracingSession();
-                Tracing.instance.begin("Execute CQL3 query", ImmutableMap.of("query", query));
+
+                ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+                builder.put("query", query);
+                if (resultPageSize > 0)
+                    builder.put("page_size", Integer.toString(resultPageSize));
+
+                Tracing.instance.begin("Execute CQL3 query", builder.build());
             }
 
-            Message.Response response = QueryProcessor.process(query, values, consistency, state);
+            Message.Response response = QueryProcessor.process(query, values, consistency, state, resultPageSize);
 
             if (tracingId != null)
                 response.setTracingId(tracingId);
@@ -136,6 +151,9 @@ public class QueryMessage extends Message.Request
         finally
         {
             Tracing.instance.stopSession();
+            // Trash the current session id if we won't need it anymore
+            if (!state.hasPager())
+                state.getAndResetCurrentTracingSession();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java b/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
index 63899e1..414fdd3 100644
--- a/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
@@ -34,7 +34,7 @@ public class ReadyMessage extends Message.Response
             return new ReadyMessage();
         }
 
-        public ChannelBuffer encode(ReadyMessage msg)
+        public ChannelBuffer encode(ReadyMessage msg, int version)
         {
             return ChannelBuffers.EMPTY_BUFFER;
         }
@@ -47,7 +47,7 @@ public class ReadyMessage extends Message.Response
 
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
index 9969e3a..a6816fb 100644
--- a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
@@ -39,7 +39,7 @@ public class RegisterMessage extends Message.Request
             return new RegisterMessage(eventTypes);
         }
 
-        public ChannelBuffer encode(RegisterMessage msg)
+        public ChannelBuffer encode(RegisterMessage msg, int version)
         {
             ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
             cb.writeShort(msg.eventTypes.size());
@@ -69,7 +69,7 @@ public class RegisterMessage extends Message.Request
 
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
index fcff0a8..33975a4 100644
--- a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
@@ -40,7 +40,7 @@ public abstract class ResultMessage extends Message.Response
             return kind.subcodec.decode(body, version);
         }
 
-        public ChannelBuffer encode(ResultMessage msg)
+        public ChannelBuffer encode(ResultMessage msg, int version)
         {
             ChannelBuffer kcb = ChannelBuffers.buffer(4);
             kcb.writeInt(msg.kind.id);
@@ -101,7 +101,7 @@ public abstract class ResultMessage extends Message.Response
 
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     protected abstract ChannelBuffer encodeBody();
@@ -124,7 +124,7 @@ public abstract class ResultMessage extends Message.Response
                 return new Void();
             }
 
-            public ChannelBuffer encode(ResultMessage msg)
+            public ChannelBuffer encode(ResultMessage msg, int version)
             {
                 assert msg instanceof Void;
                 return ChannelBuffers.EMPTY_BUFFER;
@@ -133,7 +133,7 @@ public abstract class ResultMessage extends Message.Response
 
         protected ChannelBuffer encodeBody()
         {
-            return subcodec.encode(this);
+            return subcodec.encode(this, getVersion());
         }
 
         public CqlResult toThriftResult()
@@ -166,7 +166,7 @@ public abstract class ResultMessage extends Message.Response
                 return new SetKeyspace(keyspace);
             }
 
-            public ChannelBuffer encode(ResultMessage msg)
+            public ChannelBuffer encode(ResultMessage msg, int version)
             {
                 assert msg instanceof SetKeyspace;
                 return CBUtil.stringToCB(((SetKeyspace)msg).keyspace);
@@ -175,7 +175,7 @@ public abstract class ResultMessage extends Message.Response
 
         protected ChannelBuffer encodeBody()
         {
-            return subcodec.encode(this);
+            return subcodec.encode(this, getVersion());
         }
 
         public CqlResult toThriftResult()
@@ -199,11 +199,11 @@ public abstract class ResultMessage extends Message.Response
                 return new Rows(ResultSet.codec.decode(body, version));
             }
 
-            public ChannelBuffer encode(ResultMessage msg)
+            public ChannelBuffer encode(ResultMessage msg, int version)
             {
                 assert msg instanceof Rows;
                 Rows rowMsg = (Rows)msg;
-                return ResultSet.codec.encode(rowMsg.result);
+                return ResultSet.codec.encode(rowMsg.result, version);
             }
         };
 
@@ -217,7 +217,7 @@ public abstract class ResultMessage extends Message.Response
 
         protected ChannelBuffer encodeBody()
         {
-            return subcodec.encode(this);
+            return subcodec.encode(this, getVersion());
         }
 
         public CqlResult toThriftResult()
@@ -243,12 +243,12 @@ public abstract class ResultMessage extends Message.Response
                 return new Prepared(id, -1, ResultSet.Metadata.codec.decode(body, version));
             }
 
-            public ChannelBuffer encode(ResultMessage msg)
+            public ChannelBuffer encode(ResultMessage msg, int version)
             {
                 assert msg instanceof Prepared;
                 Prepared prepared = (Prepared)msg;
                 assert prepared.statementId != null;
-                return ChannelBuffers.wrappedBuffer(CBUtil.bytesToCB(prepared.statementId.bytes), ResultSet.Metadata.codec.encode(prepared.metadata));
+                return ChannelBuffers.wrappedBuffer(CBUtil.bytesToCB(prepared.statementId.bytes), ResultSet.Metadata.codec.encode(prepared.metadata, version));
             }
         };
 
@@ -278,7 +278,7 @@ public abstract class ResultMessage extends Message.Response
 
         protected ChannelBuffer encodeBody()
         {
-            return subcodec.encode(this);
+            return subcodec.encode(this, getVersion());
         }
 
         public CqlResult toThriftResult()
@@ -347,7 +347,7 @@ public abstract class ResultMessage extends Message.Response
 
             }
 
-            public ChannelBuffer encode(ResultMessage msg)
+            public ChannelBuffer encode(ResultMessage msg, int version)
             {
                 assert msg instanceof SchemaChange;
                 SchemaChange scm = (SchemaChange)msg;
@@ -361,7 +361,7 @@ public abstract class ResultMessage extends Message.Response
 
         protected ChannelBuffer encodeBody()
         {
-            return subcodec.encode(this);
+            return subcodec.encode(this, getVersion());
         }
 
         public CqlResult toThriftResult()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
index 66b245b..ea6ae99 100644
--- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
@@ -45,7 +45,7 @@ public class StartupMessage extends Message.Request
             return new StartupMessage(CBUtil.readStringMap(body));
         }
 
-        public ChannelBuffer encode(StartupMessage msg)
+        public ChannelBuffer encode(StartupMessage msg, int version)
         {
             ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
             CBUtil.writeStringMap(cb, msg.options);
@@ -63,7 +63,7 @@ public class StartupMessage extends Message.Request
 
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     public Message.Response execute(QueryState state)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
index 8f7873d..7318112 100644
--- a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
@@ -38,7 +38,7 @@ public class SupportedMessage extends Message.Response
             return new SupportedMessage(CBUtil.readStringToStringListMap(body));
         }
 
-        public ChannelBuffer encode(SupportedMessage msg)
+        public ChannelBuffer encode(SupportedMessage msg, int version)
         {
             ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
             CBUtil.writeStringToStringListMap(cb, msg.supported);
@@ -56,7 +56,7 @@ public class SupportedMessage extends Message.Response
 
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 39802cc..e5ee5f3 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -1043,9 +1043,15 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         for (int i = 0; i < 4; i++)
             cols[i] = column("c" + i, "value", 1);
 
-        putColsStandard(cfs, Util.dk("a"), cols[0], cols[1], cols[2], cols[3]);
-        putColsStandard(cfs, Util.dk("b"), cols[0], cols[1], cols[2]);
-        putColsStandard(cfs, Util.dk("c"), cols[0], cols[1], cols[2], cols[3]);
+        DecoratedKey ka = Util.dk("a");
+        DecoratedKey kb = Util.dk("b");
+        DecoratedKey kc = Util.dk("c");
+
+        RowPosition min = Util.rp("");
+
+        putColsStandard(cfs, ka, cols[0], cols[1], cols[2], cols[3]);
+        putColsStandard(cfs, kb, cols[0], cols[1], cols[2]);
+        putColsStandard(cfs, kc, cols[0], cols[1], cols[2], cols[3]);
         cfs.forceBlockingFlush();
 
         SlicePredicate sp = new SlicePredicate();
@@ -1054,58 +1060,87 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         sp.getSlice_range().setStart(ArrayUtils.EMPTY_BYTE_ARRAY);
         sp.getSlice_range().setFinish(ArrayUtils.EMPTY_BYTE_ARRAY);
 
-        Collection<Row> rows = cfs.getRangeSlice(Util.range("", ""),
-                                                 null,
-                                                 ThriftValidation.asIFilter(sp, cfs.metadata, null),
-                                                 3,
-                                                 System.currentTimeMillis(),
-                                                 true,
-                                                 true);
-        assert rows.size() == 1 : "Expected 1 row, got " + rows;
-        Row row = rows.iterator().next();
+        Collection<Row> rows;
+        Row row, row1, row2;
+        IDiskAtomFilter filter = ThriftValidation.asIFilter(sp, cfs.metadata, null);
+
+        rows = cfs.getRangeSlice(cfs.makeExtendedFilter(Util.range("", ""), filter, null, 3, true, true, System.currentTimeMillis()));
+        assert rows.size() == 1 : "Expected 1 row, got " + toString(rows);
+        row = rows.iterator().next();
         assertColumnNames(row, "c0", "c1", "c2");
 
         sp.getSlice_range().setStart(ByteBufferUtil.getArray(ByteBufferUtil.bytes("c2")));
-        rows = cfs.getRangeSlice(Util.range("", ""),
-                                 null,
-                                 ThriftValidation.asIFilter(sp, cfs.metadata, null),
-                                 3,
-                                 System.currentTimeMillis(),
-                                 true,
-                                 true);
-        assert rows.size() == 2 : "Expected 2 rows, got " + rows;
+        filter = ThriftValidation.asIFilter(sp, cfs.metadata, null);
+        rows = cfs.getRangeSlice(cfs.makeExtendedFilter(new Bounds<RowPosition>(ka, min), filter, null, 3, true, true, System.currentTimeMillis()));
+        assert rows.size() == 2 : "Expected 2 rows, got " + toString(rows);
         Iterator<Row> iter = rows.iterator();
-        Row row1 = iter.next();
-        Row row2 = iter.next();
+        row1 = iter.next();
+        row2 = iter.next();
         assertColumnNames(row1, "c2", "c3");
         assertColumnNames(row2, "c0");
 
         sp.getSlice_range().setStart(ByteBufferUtil.getArray(ByteBufferUtil.bytes("c0")));
-        rows = cfs.getRangeSlice(new Bounds<RowPosition>(row2.key, Util.rp("")),
-                                 null,
-                                 ThriftValidation.asIFilter(sp, cfs.metadata, null),
-                                 3,
-                                 System.currentTimeMillis(),
-                                 true,
-                                 true);
-        assert rows.size() == 1 : "Expected 1 row, got " + rows;
+        filter = ThriftValidation.asIFilter(sp, cfs.metadata, null);
+        rows = cfs.getRangeSlice(cfs.makeExtendedFilter(new Bounds<RowPosition>(row2.key, min), filter, null, 3, true, true, System.currentTimeMillis()));
+        assert rows.size() == 1 : "Expected 1 row, got " + toString(rows);
         row = rows.iterator().next();
         assertColumnNames(row, "c0", "c1", "c2");
 
         sp.getSlice_range().setStart(ByteBufferUtil.getArray(ByteBufferUtil.bytes("c2")));
-        rows = cfs.getRangeSlice(new Bounds<RowPosition>(row.key, Util.rp("")),
-                                 null,
-                                 ThriftValidation.asIFilter(sp, cfs.metadata, null),
-                                 3,
-                                 System.currentTimeMillis(),
-                                 true,
-                                 true);
-        assert rows.size() == 2 : "Expected 2 rows, got " + rows;
+        filter = ThriftValidation.asIFilter(sp, cfs.metadata, null);
+        rows = cfs.getRangeSlice(cfs.makeExtendedFilter(new Bounds<RowPosition>(row.key, min), filter, null, 3, true, true, System.currentTimeMillis()));
+        assert rows.size() == 2 : "Expected 2 rows, got " + toString(rows);
         iter = rows.iterator();
         row1 = iter.next();
         row2 = iter.next();
         assertColumnNames(row1, "c2");
         assertColumnNames(row2, "c0", "c1");
+
+        // Paging within bounds
+        SliceQueryFilter sf = new SliceQueryFilter(ByteBufferUtil.bytes("c1"),
+                                                   ByteBufferUtil.bytes("c2"),
+                                                   false,
+                                                   0);
+        rows = cfs.getRangeSlice(cfs.makeExtendedFilter(new Bounds<RowPosition>(ka, kc), sf, ByteBufferUtil.bytes("c2"), ByteBufferUtil.bytes("c1"), null, 2, System.currentTimeMillis()));
+        assert rows.size() == 2 : "Expected 2 rows, got " + toString(rows);
+        iter = rows.iterator();
+        row1 = iter.next();
+        row2 = iter.next();
+        assertColumnNames(row1, "c2");
+        assertColumnNames(row2, "c1");
+
+        rows = cfs.getRangeSlice(cfs.makeExtendedFilter(new Bounds<RowPosition>(kb, kc), sf, ByteBufferUtil.bytes("c1"), ByteBufferUtil.bytes("c1"), null, 10, System.currentTimeMillis()));
+        assert rows.size() == 2 : "Expected 2 rows, got " + toString(rows);
+        iter = rows.iterator();
+        row1 = iter.next();
+        row2 = iter.next();
+        assertColumnNames(row1, "c1", "c2");
+        assertColumnNames(row2, "c1");
+    }
+
+    private static String toString(Collection<Row> rows)
+    {
+        try
+        {
+            StringBuilder sb = new StringBuilder();
+            for (Row row : rows)
+            {
+                sb.append("{");
+                sb.append(ByteBufferUtil.string(row.key.key));
+                sb.append(":");
+                if (row.cf != null && !row.cf.isEmpty())
+                {
+                    for (Column c : row.cf)
+                        sb.append(" ").append(ByteBufferUtil.string(c.name()));
+                }
+                sb.append("} ");
+            }
+            return sb.toString();
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     private static void assertColumnNames(Row row, String ... columnNames) throws Exception
@@ -1205,7 +1240,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
 
         IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"), IndexOperator.EQ, LongType.instance.decompose(1L));
         // explicitly tell to the KeysSearcher to use column limiting for rowsPerQuery to trigger bogus columnsRead--; (CASSANDRA-3996)
-        List<Row> rows = store.search(Util.range("", ""), Arrays.asList(expr), new IdentityQueryFilter(), 10, System.currentTimeMillis(), true);
+        List<Row> rows = store.search(store.makeExtendedFilter(Util.range("", ""), new IdentityQueryFilter(), Arrays.asList(expr), 10, true, false, System.currentTimeMillis()));
 
         assert rows.size() == 10;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 1fe7a46..790163d 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableScanner;
@@ -147,7 +148,8 @@ public class CompactionsTest extends SchemaLoader
 
         // check that the shadowed column is gone
         SSTableReader sstable = cfs.getSSTables().iterator().next();
-        SSTableScanner scanner = sstable.getScanner(new QueryFilter(null, "Super1", new IdentityQueryFilter(), System.currentTimeMillis()), key);
+        Range keyRange = new Range<RowPosition>(key, sstable.partitioner.getMinimumToken().maxKeyBound());
+        SSTableScanner scanner = sstable.getScanner(DataRange.forKeyRange(keyRange));
         OnDiskAtomIterator iter = scanner.next();
         assertEquals(key, iter.getKey());
         assert iter.next() instanceof RangeTombstone;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
index c14e860..8c4c305 100644
--- a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
@@ -30,8 +30,10 @@ import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataRange;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
@@ -140,7 +142,7 @@ public class TTLExpiryTest extends SchemaLoader
         cfs.enableAutoCompaction(true);
         assertEquals(1, cfs.getSSTables().size());
         SSTableReader sstable = cfs.getSSTables().iterator().next();
-        SSTableScanner scanner = sstable.getScanner(new QueryFilter(null, "Standard1", new IdentityQueryFilter(), System.currentTimeMillis()));
+        SSTableScanner scanner = sstable.getScanner(DataRange.allData(sstable.partitioner));
         assertTrue(scanner.hasNext());
         while(scanner.hasNext())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/test/unit/org/apache/cassandra/service/QueryPagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/QueryPagerTest.java b/test/unit/org/apache/cassandra/service/QueryPagerTest.java
new file mode 100644
index 0000000..1d18321
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/QueryPagerTest.java
@@ -0,0 +1,283 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.service;
+
+import java.util.*;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.service.pager.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static junit.framework.Assert.*;
+import static org.apache.cassandra.Util.bounds;
+import static org.apache.cassandra.Util.range;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+@RunWith(OrderedJUnit4ClassRunner.class)
+public class QueryPagerTest extends SchemaLoader
+{
+    private static final String KS = "Keyspace1";
+    private static final String CF = "Standard1";
+
+    private static String string(ByteBuffer bb)
+    {
+        try
+        {
+            return ByteBufferUtil.string(bb);
+        }
+        catch (CharacterCodingException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @BeforeClass
+    public static void addData()
+    {
+        cfs().clearUnsafe();
+
+        int nbKeys = 10;
+        int nbCols = 10;
+
+        /*
+         * Creates the following data:
+         *   k1: c1 ... cn
+         *   ...
+         *   ki: c1 ... cn
+         */
+        for (int i = 0; i < nbKeys; i++)
+        {
+            RowMutation rm = new RowMutation(KS, bytes("k" + i));
+            ColumnFamily cf = rm.addOrGet(CF);
+
+            for (int j = 0; j < nbCols; j++)
+                cf.addColumn(Util.column("c" + j, "", 0));
+
+            rm.applyUnsafe();
+        }
+    }
+
+    private static ColumnFamilyStore cfs()
+    {
+        return Table.open(KS).getColumnFamilyStore(CF);
+    }
+
+    private static String toString(List<Row> rows)
+    {
+        StringBuilder sb = new StringBuilder();
+        for (Row row : rows)
+            sb.append(string(row.key.key)).append(":").append(toString(row.cf)).append("\n");
+        return sb.toString();
+    }
+
+    private static String toString(ColumnFamily cf)
+    {
+        if (cf == null)
+            return "";
+
+        StringBuilder sb = new StringBuilder();
+        for (Column c : cf)
+            sb.append(" ").append(string(c.name()));
+        return sb.toString();
+    }
+
+    private static ReadCommand namesQuery(String key, String... names)
+    {
+        SortedSet<ByteBuffer> s = new TreeSet<ByteBuffer>(cfs().metadata.comparator);
+        for (String name : names)
+            s.add(bytes(name));
+        return new SliceByNamesReadCommand(KS, bytes(key), CF, System.currentTimeMillis(), new NamesQueryFilter(s, true));
+    }
+
+    private static ReadCommand sliceQuery(String key, String start, String end, int count)
+    {
+        SliceQueryFilter filter = new SliceQueryFilter(bytes(start), bytes(end), false, count);
+        return new SliceFromReadCommand(KS, bytes(key), CF, System.currentTimeMillis(), filter);
+    }
+
+    private static RangeSliceCommand rangeNamesQuery(AbstractBounds<RowPosition> range, int count, String... names)
+    {
+        SortedSet<ByteBuffer> s = new TreeSet<ByteBuffer>(cfs().metadata.comparator);
+        for (String name : names)
+            s.add(bytes(name));
+        return new RangeSliceCommand(KS, CF, System.currentTimeMillis(), new NamesQueryFilter(s, true), range, count);
+    }
+
+    private static RangeSliceCommand rangeSliceQuery(AbstractBounds<RowPosition> range, int count, String start, String end)
+    {
+        SliceQueryFilter filter = new SliceQueryFilter(bytes(start), bytes(end), false, Integer.MAX_VALUE);
+        return new RangeSliceCommand(KS, CF, System.currentTimeMillis(), filter, range, count);
+    }
+
+    private static void assertRow(Row r, String key, String... names)
+    {
+        assertEquals(key, string(r.key.key));
+        assertNotNull(r.cf);
+        assertEquals(toString(r.cf), names.length, r.cf.getColumnCount());
+        int i = 0;
+        for (Column c : r.cf)
+        {
+            String expected = names[i++];
+            assertEquals("column " + i + " doesn't match: " + toString(r.cf), expected, string(c.name()));
+        }
+    }
+
+    @Test
+    public void NamesQueryTest() throws Exception
+    {
+        QueryPager pager = QueryPagers.localPager(namesQuery("k0", "c1", "c5", "c7", "c8"));
+
+        assertFalse(pager.isExhausted());
+        List<Row> page = pager.fetchPage(3);
+        assertEquals(toString(page), 1, page.size());
+        assertRow(page.get(0), "k0", "c1", "c5", "c7", "c8");
+
+        assertTrue(pager.isExhausted());
+    }
+
+    @Test
+    public void SliceQueryTest() throws Exception
+    {
+        QueryPager pager = QueryPagers.localPager(sliceQuery("k0", "c1", "c8", 10));
+
+        List<Row> page;
+
+        assertFalse(pager.isExhausted());
+        page = pager.fetchPage(3);
+        assertEquals(toString(page), 1, page.size());
+        assertRow(page.get(0), "k0", "c1", "c2", "c3");
+
+        assertFalse(pager.isExhausted());
+        page = pager.fetchPage(3);
+        assertEquals(toString(page), 1, page.size());
+        assertRow(page.get(0), "k0", "c4", "c5", "c6");
+
+        assertFalse(pager.isExhausted());
+        page = pager.fetchPage(3);
+        assertEquals(toString(page), 1, page.size());
+        assertRow(page.get(0), "k0", "c7", "c8");
+
+        assertTrue(pager.isExhausted());
+    }
+
+    @Test
+    public void MultiQueryTest() throws Exception
+    {
+        QueryPager pager = QueryPagers.localPager(new Pageable.ReadCommands(new ArrayList<ReadCommand>() {{
+            add(sliceQuery("k1", "c2", "c6", 10));
+            add(sliceQuery("k4", "c3", "c5", 10));
+        }}));
+
+        List<Row> page;
+
+        assertFalse(pager.isExhausted());
+        page = pager.fetchPage(3);
+        assertEquals(toString(page), 1, page.size());
+        assertRow(page.get(0), "k1", "c2", "c3", "c4");
+
+        assertFalse(pager.isExhausted());
+        page = pager.fetchPage(4);
+        assertEquals(toString(page), 2, page.size());
+        assertRow(page.get(0), "k1", "c5", "c6");
+        assertRow(page.get(1), "k4", "c3", "c4");
+
+        assertFalse(pager.isExhausted());
+        page = pager.fetchPage(3);
+        assertEquals(toString(page), 1, page.size());
+        assertRow(page.get(0), "k4", "c5");
+
+        assertTrue(pager.isExhausted());
+    }
+
+    @Test
+    public void RangeNamesQueryTest() throws Exception
+    {
+        QueryPager pager = QueryPagers.localPager(rangeNamesQuery(range("k0", "k5"), 100, "c1", "c4", "c8"));
+
+        List<Row> page;
+
+        assertFalse(pager.isExhausted());
+        page = pager.fetchPage(3);
+        assertEquals(toString(page), 3, page.size());
+        for (int i = 1; i <= 3; i++)
+            assertRow(page.get(i-1), "k" + i, "c1", "c4", "c8");
+
+        assertFalse(pager.isExhausted());
+        page = pager.fetchPage(3);
+        assertEquals(toString(page), 2, page.size());
+        for (int i = 4; i <= 5; i++)
+            assertRow(page.get(i-4), "k" + i, "c1", "c4", "c8");
+
+        assertTrue(pager.isExhausted());
+    }
+
+    @Test
+    public void RangeSliceQueryTest() throws Exception
+    {
+        QueryPager pager = QueryPagers.localPager(rangeSliceQuery(range("k1", "k5"), 100, "c1", "c7"));
+
+        List<Row> page;
+
+        assertFalse(pager.isExhausted());
+        page = pager.fetchPage(5);
+        assertEquals(toString(page), 1, page.size());
+        assertRow(page.get(0), "k2", "c1", "c2", "c3", "c4", "c5");
+
+        assertFalse(pager.isExhausted());
+        page = pager.fetchPage(4);
+        assertEquals(toString(page), 2, page.size());
+        assertRow(page.get(0), "k2", "c6", "c7");
+        assertRow(page.get(1), "k3", "c1", "c2");
+
+        assertFalse(pager.isExhausted());
+        page = pager.fetchPage(6);
+        assertEquals(toString(page), 2, page.size());
+        assertRow(page.get(0), "k3", "c3", "c4", "c5", "c6", "c7");
+        assertRow(page.get(1), "k4", "c1");
+
+        assertFalse(pager.isExhausted());
+        page = pager.fetchPage(5);
+        assertEquals(toString(page), 1, page.size());
+        assertRow(page.get(0), "k4", "c2", "c3", "c4", "c5", "c6");
+
+        assertFalse(pager.isExhausted());
+        page = pager.fetchPage(5);
+        assertEquals(toString(page), 2, page.size());
+        assertRow(page.get(0), "k4", "c7");
+        assertRow(page.get(1), "k5", "c1", "c2", "c3", "c4");
+
+        assertFalse(pager.isExhausted());
+        page = pager.fetchPage(5);
+        assertEquals(toString(page), 1, page.size());
+        assertRow(page.get(0), "k5", "c5", "c6", "c7");
+
+        assertTrue(pager.isExhausted());
+    }
+}


[2/4] Add auto paging capability to the native protocol

Posted by sl...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
new file mode 100644
index 0000000..460bc44
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Iterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnCounter;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+
+abstract class AbstractQueryPager implements QueryPager
+{
+    private final ConsistencyLevel consistencyLevel;
+    private final boolean localQuery;
+
+    protected final CFMetaData cfm;
+    protected final IDiskAtomFilter columnFilter;
+    private final long timestamp;
+
+    private volatile int remaining;
+    private volatile boolean exhausted;
+    private volatile boolean lastWasRecorded;
+
+    protected AbstractQueryPager(ConsistencyLevel consistencyLevel,
+                                 int toFetch,
+                                 boolean localQuery,
+                                 String keyspace,
+                                 String columnFamily,
+                                 IDiskAtomFilter columnFilter,
+                                 long timestamp)
+    {
+        this.consistencyLevel = consistencyLevel;
+        this.localQuery = localQuery;
+
+        this.cfm = Schema.instance.getCFMetaData(keyspace, columnFamily);
+        this.columnFilter = columnFilter;
+        this.timestamp = timestamp;
+
+        this.remaining = toFetch;
+    }
+
+    public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
+    {
+        if (isExhausted())
+            return Collections.emptyList();
+
+        int currentPageSize = nextPageSize(pageSize);
+        List<Row> rows = filterEmpty(queryNextPage(currentPageSize, consistencyLevel, localQuery));
+
+        if (rows.isEmpty())
+        {
+            exhausted = true;
+            return Collections.emptyList();
+        }
+
+        int liveCount = getPageLiveCount(rows);
+        remaining -= liveCount;
+
+        // If we've got less than requested, there is no more query to do (but
+        // we still need to return the current page)
+        if (liveCount < currentPageSize)
+            exhausted = true;
+
+        // If it's not the first query and the first column is the last one returned (likely
+        // but not certain since paging can race with deletes/expiration), then remove the
+        // first column.
+        if (containsPreviousLast(rows.get(0)))
+        {
+            rows = discardFirst(rows);
+            remaining++;
+        }
+        // Otherwise, if 'lastWasRecorded', we queried for one more than the page size,
+        // so if the page was is full, trim the last entry
+        else if (lastWasRecorded && !exhausted)
+        {
+            // We've asked for one more than necessary
+            rows = discardLast(rows);
+            remaining++;
+        }
+
+        if (!isExhausted())
+            lastWasRecorded = recordLast(rows.get(rows.size() - 1));
+
+        return rows;
+    }
+
+    private List<Row> filterEmpty(List<Row> result)
+    {
+        boolean doCopy = false;
+        for (Row row : result)
+        {
+            if (row.cf == null || row.cf.getColumnCount() == 0)
+            {
+                List<Row> newResult = new ArrayList<Row>(result.size() - 1);
+                for (Row row2 : result)
+                {
+                    if (row.cf == null || row.cf.getColumnCount() == 0)
+                        continue;
+
+                    newResult.add(row2);
+                }
+                return newResult;
+            }
+        }
+        return result;
+    }
+
+    public boolean isExhausted()
+    {
+        return exhausted || remaining == 0;
+    }
+
+    public int maxRemaining()
+    {
+        return remaining;
+    }
+
+    public long timestamp()
+    {
+        return timestamp;
+    }
+
+    private int nextPageSize(int pageSize)
+    {
+        return Math.min(remaining, pageSize) + (lastWasRecorded ? 1 : 0);
+    }
+
+    public ColumnCounter columnCounter()
+    {
+        return columnFilter.columnCounter(cfm.comparator, timestamp);
+    }
+
+    protected abstract List<Row> queryNextPage(int pageSize, ConsistencyLevel consistency, boolean localQuery) throws RequestValidationException, RequestExecutionException;
+    protected abstract boolean containsPreviousLast(Row first);
+    protected abstract boolean recordLast(Row last);
+
+    private List<Row> discardFirst(List<Row> rows)
+    {
+        Row first = rows.get(0);
+        ColumnFamily newCf = discardFirst(first.cf);
+
+        int count = newCf.getColumnCount();
+        List<Row> newRows = new ArrayList<Row>(count == 0 ? rows.size() - 1 : rows.size());
+        if (count != 0)
+            newRows.add(new Row(first.key, newCf));
+        newRows.addAll(rows.subList(1, rows.size()));
+
+        return newRows;
+    }
+
+    private List<Row> discardLast(List<Row> rows)
+    {
+        Row last = rows.get(rows.size() - 1);
+        ColumnFamily newCf = discardLast(last.cf);
+
+        int count = newCf.getColumnCount();
+        List<Row> newRows = new ArrayList<Row>(count == 0 ? rows.size() - 1 : rows.size());
+        newRows.addAll(rows.subList(0, rows.size() - 1));
+        if (count != 0)
+            newRows.add(new Row(last.key, newCf));
+
+        return newRows;
+    }
+
+    private int getPageLiveCount(List<Row> page)
+    {
+        int count = 0;
+        for (Row row : page)
+            count += columnCounter().countAll(row.cf).live();
+        return count;
+    }
+
+    private ColumnFamily discardFirst(ColumnFamily cf)
+    {
+        ColumnFamily copy = cf.cloneMeShallow();
+        ColumnCounter counter = columnCounter();
+
+        Iterator<Column> iter = cf.iterator();
+        // Discard the first live
+        while (iter.hasNext())
+        {
+            Column c = iter.next();
+            counter.count(c, cf);
+            if (counter.live() > 1)
+            {
+                copy.addColumn(c);
+                while (iter.hasNext())
+                    copy.addColumn(iter.next());
+            }
+        }
+        return copy;
+    }
+
+    private ColumnFamily discardLast(ColumnFamily cf)
+    {
+        ColumnFamily copy = cf.cloneMeShallow();
+        // Redoing the counting like that is not extremely efficient, but
+        // discardLast is only called in case of a race between paging and
+        // a deletion, which is pretty unlikely, so probably not a big deal
+        int liveCount = columnCounter().countAll(cf).live();
+
+        ColumnCounter counter = columnCounter();
+        // Discard the first live
+        for (Column c : cf)
+        {
+            counter.count(c, cf);
+            if (counter.live() < liveCount)
+                copy.addColumn(c);
+        }
+        return copy;
+    }
+
+    protected static ByteBuffer firstName(ColumnFamily cf)
+    {
+        return cf.iterator().next().name();
+    }
+
+    protected static ByteBuffer lastName(ColumnFamily cf)
+    {
+        return cf.getReverseSortedColumns().iterator().next().name();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
new file mode 100644
index 0000000..ef82535
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnCounter;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+
+/**
+ * Pager over a list of ReadCommand.
+ *
+ * Note that this is not easy to make efficient. Indeed, we need to page the first command fully before
+ * returning results from the next one, but if the result returned by each command is small (compared to pageSize),
+ * paging the commands one at a time under-performs compared to parallelizing. On the other, if we parallelize
+ * and each command raised pageSize results, we'll end up with commands.size() * pageSize results in memory, which
+ * defeats the purpose of paging.
+ *
+ * For now, we keep it simple (somewhat) and just do one command at a time. Provided that we make sure to not
+ * create a pager unless we need to, this is probably fine. Though if we later want to get fancy, we could use the
+ * cfs meanRowSize to decide if parallelizing some of the command might be worth it while being confident we don't
+ * blow out memory.
+ */
+class MultiPartitionPager implements QueryPager
+{
+    private final SinglePartitionPager[] pagers;
+    private final long timestamp;
+
+    private volatile int current;
+
+    MultiPartitionPager(List<ReadCommand> commands, final ConsistencyLevel consistencyLevel, final boolean localQuery)
+    {
+        this.pagers = new SinglePartitionPager[commands.size()];
+
+        long tstamp = -1;
+        for (int i = 0; i < commands.size(); i++)
+        {
+            ReadCommand command = commands.get(i);
+            if (tstamp == -1)
+                tstamp = command.timestamp;
+            else if (tstamp != command.timestamp)
+                throw new IllegalArgumentException("All commands must have the same timestamp or weird results may happen.");
+
+            pagers[i] = command instanceof SliceFromReadCommand
+                      ? new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, localQuery)
+                      : new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, localQuery);
+        }
+        timestamp = tstamp;
+    }
+
+    public boolean isExhausted()
+    {
+        while (current < pagers.length)
+        {
+            if (!pagers[current].isExhausted())
+                return false;
+
+            current++;
+        }
+        return true;
+    }
+
+    public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
+    {
+        int remaining = pageSize;
+        List<Row> result = new ArrayList<Row>();
+
+        while (!isExhausted() && remaining > 0)
+        {
+            // Exhausted also sets us on the first non-exhausted pager
+            List<Row> page = pagers[current].fetchPage(remaining);
+            if (page.isEmpty())
+                continue;
+
+            Row row = page.get(0);
+            remaining -= pagers[current].columnCounter().countAll(row.cf).live();
+            result.add(row);
+        }
+
+        return result;
+    }
+
+    public int maxRemaining()
+    {
+        int max = 0;
+        for (int i = current; i < pagers.length; i++)
+            max += pagers[i].maxRemaining();
+        return max;
+    }
+
+    public long timestamp()
+    {
+        return timestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java b/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
new file mode 100644
index 0000000..82e7376
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnCounter;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.StorageProxy;
+
+/**
+ * Pager over a SliceByNamesReadCommand.
+ */
+public class NamesQueryPager implements SinglePartitionPager
+{
+    private final SliceByNamesReadCommand command;
+    private final ConsistencyLevel consistencyLevel;
+    private final boolean localQuery;
+
+    private volatile boolean queried;
+
+    /**
+     * For now, we'll only use this in CQL3. In there, as name query can never
+     * yield more than one CQL3 row, there is no need for paging and so this is straight-forward.
+     *
+     * For thrift, we could imagine needing to page, though even then it's very
+     * unlikely unless the pageSize is very small.
+     *
+     * In any case we currently assert in fetchPage if it's a "thrift" query (i.e. a query that
+     * count every cell individually) and the names filter asks for more than pageSize columns.
+     */
+    // Don't use directly, use QueryPagers method instead
+    NamesQueryPager(SliceByNamesReadCommand command, ConsistencyLevel consistencyLevel, boolean localQuery)
+    {
+        this.command = command;
+        this.consistencyLevel = consistencyLevel;
+        this.localQuery = localQuery;
+    }
+
+    public ColumnCounter columnCounter()
+    {
+        // We know NamesQueryFilter.columnCounter don't care about his argument
+        return command.filter.columnCounter(null, command.timestamp);
+    }
+
+    public boolean isExhausted()
+    {
+        return queried;
+    }
+
+    public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
+    {
+        assert command.filter.countCQL3Rows() || command.filter.columns.size() <= pageSize;
+
+        if (isExhausted())
+            return Collections.<Row>emptyList();
+
+        queried = true;
+        return localQuery
+             ? Collections.singletonList(command.getRow(Table.open(command.table)))
+             : StorageProxy.read(Collections.<ReadCommand>singletonList(command), consistencyLevel);
+    }
+
+    public int maxRemaining()
+    {
+        if (queried)
+            return 0;
+
+        return command.filter.countCQL3Rows() ? 1 : command.filter.columns.size();
+    }
+
+    public long timestamp()
+    {
+        return command.timestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/Pageable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/Pageable.java b/src/java/org/apache/cassandra/service/pager/Pageable.java
new file mode 100644
index 0000000..3a69bf4
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/Pageable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.util.List;
+
+import org.apache.cassandra.db.ReadCommand;
+
+/**
+ * Marker interface for commands that can be paged.
+ */
+public interface Pageable
+{
+    public static class ReadCommands implements Pageable
+    {
+        public final List<ReadCommand> commands;
+
+        public ReadCommands(List<ReadCommand> commands)
+        {
+            this.commands = commands;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/QueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPager.java b/src/java/org/apache/cassandra/service/pager/QueryPager.java
new file mode 100644
index 0000000..a390859
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/QueryPager.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.util.List;
+
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+
+/**
+ * Perform a query, paging it by page of a given size.
+ *
+ * This is essentially an iterator of pages. Each call to fetchPage() will
+ * return the next page (i.e. the next list of rows) and isExhausted()
+ * indicates whether there is more page to fetch. The pageSize will
+ * either be in term of cells or in term of CQL3 row, depending on the
+ * parameters of the command we page.
+ *
+ * Please note that the pager might page within rows, so there is no guarantee
+ * that successive pages won't return the same row (though with different
+ * columns every time).
+ *
+ * Also, there is no guarantee that fetchPage() won't return an empty list,
+ * even if isExhausted() return false (but it is guaranteed to return an empty
+ * list *if* isExhausted() return true). Indeed, isExhausted() does *not*
+ * trigger a query so in some (failry rare) case we might not know the paging
+ * is done even though it is.
+ */
+public interface QueryPager
+{
+    /**
+     * Fetches the next page.
+     *
+     * @param pageSize the maximum number of elements to return in the next page.
+     * @return the page of result.
+     */
+    public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException;
+
+    /**
+     * Whether or not this pager is exhausted, i.e. whether or not a call to
+     * fetchPage may return more result.
+     *
+     * @return whether the pager is exhausted.
+     */
+    public boolean isExhausted();
+
+    /**
+     * The maximum number of cells/CQL3 row that we may still have to return.
+     * In other words, that's the initial user limit minus what we've already
+     * returned (note that it's not how many we *will* return, just the upper
+     * limit on it).
+     */
+    public int maxRemaining();
+
+    /**
+     * The timestamp used by the last page.
+     */
+    public long timestamp();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/QueryPagers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
new file mode 100644
index 0000000..9bc3afd
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnCounter;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+
+/**
+ * Static utility methods to create query pagers.
+ */
+public class QueryPagers
+{
+    private QueryPagers() {};
+
+    private static int maxQueried(ReadCommand command)
+    {
+        if (command instanceof SliceByNamesReadCommand)
+        {
+            NamesQueryFilter filter = ((SliceByNamesReadCommand)command).filter;
+            return filter.countCQL3Rows() ? 1 : filter.columns.size();
+        }
+        else
+        {
+            SliceQueryFilter filter = ((SliceFromReadCommand)command).filter;
+            return filter.count;
+        }
+    }
+
+    public static boolean mayNeedPaging(Pageable command, int pageSize)
+    {
+        if (command instanceof Pageable.ReadCommands)
+        {
+            List<ReadCommand> commands = ((Pageable.ReadCommands)command).commands;
+
+            int maxQueried = 0;
+            for (ReadCommand readCmd : commands)
+                maxQueried += maxQueried(readCmd);
+
+            return maxQueried > pageSize;
+        }
+        else if (command instanceof ReadCommand)
+        {
+            return maxQueried((ReadCommand)command) > pageSize;
+        }
+        else
+        {
+            assert command instanceof RangeSliceCommand;
+            // We can never be sure a range slice won't need paging
+            return true;
+        }
+    }
+
+    private static QueryPager pager(ReadCommand command, ConsistencyLevel consistencyLevel, boolean local)
+    {
+        if (command instanceof SliceByNamesReadCommand)
+            return new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, local);
+        else
+            return new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, local);
+    }
+
+    private static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel, boolean local)
+    {
+        if (command instanceof Pageable.ReadCommands)
+        {
+            List<ReadCommand> commands = ((Pageable.ReadCommands)command).commands;
+            if (commands.size() == 1)
+                return pager(commands.get(0), consistencyLevel, local);
+
+            return new MultiPartitionPager(commands, consistencyLevel, local);
+        }
+        else if (command instanceof ReadCommand)
+        {
+            return pager((ReadCommand)command, consistencyLevel, local);
+        }
+        else
+        {
+            assert command instanceof RangeSliceCommand;
+            RangeSliceCommand rangeCommand = (RangeSliceCommand)command;
+            if (rangeCommand.predicate instanceof NamesQueryFilter)
+                return new RangeNamesQueryPager(rangeCommand, consistencyLevel, local);
+            else
+                return new RangeSliceQueryPager(rangeCommand, consistencyLevel, local);
+        }
+    }
+
+    public static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel)
+    {
+        return pager(command, consistencyLevel, false);
+    }
+
+    public static QueryPager localPager(Pageable command)
+    {
+        return pager(command, null, true);
+    }
+
+    /**
+     * Convenience method to (locally) page an internal row.
+     * Used to 2ndary index a wide row without dying.
+     */
+    public static Iterator<ColumnFamily> pageRowLocally(final ColumnFamilyStore cfs, ByteBuffer key, final int pageSize)
+    {
+        SliceFromReadCommand command = new SliceFromReadCommand(cfs.metadata.ksName, key, cfs.name, System.currentTimeMillis(), new IdentityQueryFilter());
+        final SliceQueryPager pager = new SliceQueryPager(command, null, true);
+
+        return new Iterator<ColumnFamily>()
+        {
+            // We don't use AbstractIterator because we don't want hasNext() to do an actual query
+            public boolean hasNext()
+            {
+                return !pager.isExhausted();
+            }
+
+            public ColumnFamily next()
+            {
+                try
+                {
+                    List<Row> rows = pager.fetchPage(pageSize);
+                    ColumnFamily cf = rows.isEmpty() ? null : rows.get(0).cf;
+                    return cf == null ? EmptyColumns.factory.create(cfs.metadata) : cf;
+                }
+                catch (Exception e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    /**
+     * Convenience method that count (live) cells/rows for a given slice of a row, but page underneath.
+     */
+    public static int countPaged(String keyspace,
+                                String columnFamily,
+                                ByteBuffer key,
+                                SliceQueryFilter filter,
+                                ConsistencyLevel consistencyLevel,
+                                final int pageSize,
+                                long now) throws RequestValidationException, RequestExecutionException
+    {
+        SliceFromReadCommand command = new SliceFromReadCommand(keyspace, key, columnFamily, now, filter);
+        final SliceQueryPager pager = new SliceQueryPager(command, consistencyLevel, false);
+
+        ColumnCounter counter = filter.columnCounter(Schema.instance.getComparator(keyspace, columnFamily), now);
+        while (!pager.isExhausted())
+        {
+            List<Row> next = pager.fetchPage(pageSize);
+            if (!next.isEmpty())
+                counter.countAll(next.get(0).cf);
+        }
+        return counter.live();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
new file mode 100644
index 0000000..e4d4295
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.util.List;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.StorageProxy;
+
+/**
+ * Pages a RangeSliceCommand whose predicate is a name query.
+ *
+ * Note: this only work for NamesQueryFilter that have countCQL3Rows() set,
+ * because this assumes the pageSize is counted in number of internal rows
+ * returned. More precisely, this doesn't do in-row paging so this assumes
+ * that the counter returned by columnCounter() will count 1 for each internal
+ * row.
+ */
+public class RangeNamesQueryPager extends AbstractQueryPager
+{
+    private final RangeSliceCommand command;
+    private volatile RowPosition lastReturnedKey;
+
+    // Don't use directly, use QueryPagers method instead
+    RangeNamesQueryPager(RangeSliceCommand command, ConsistencyLevel consistencyLevel, boolean localQuery)
+    {
+        super(consistencyLevel, command.maxResults, localQuery, command.keyspace, command.columnFamily, command.predicate, command.timestamp);
+        this.command = command;
+        assert columnFilter instanceof NamesQueryFilter && ((NamesQueryFilter)columnFilter).countCQL3Rows();
+    }
+
+    protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery)
+    throws RequestExecutionException
+    {
+        AbstractRangeCommand pageCmd = command.withUpdatedLimit(pageSize);
+        if (lastReturnedKey != null)
+            pageCmd = pageCmd.forSubRange(makeExcludingKeyBounds(lastReturnedKey));
+
+        return localQuery
+             ? pageCmd.executeLocally()
+             : StorageProxy.getRangeSlice(pageCmd, consistencyLevel);
+    }
+
+    protected boolean containsPreviousLast(Row first)
+    {
+        // When querying the next page, we create a bound that exclude the lastReturnedKey
+        return false;
+    }
+
+    protected boolean recordLast(Row last)
+    {
+        lastReturnedKey = last.key;
+        // We return false as that means "can that last be in the next query?"
+        return false;
+    }
+
+    private AbstractBounds<RowPosition> makeExcludingKeyBounds(RowPosition lastReturnedKey)
+    {
+        // We return a range that always exclude lastReturnedKey, since we've already
+        // returned it.
+        AbstractBounds<RowPosition> bounds = command.keyRange;
+        if (bounds instanceof Range || bounds instanceof Bounds)
+        {
+            return new Range<RowPosition>(lastReturnedKey, bounds.right);
+        }
+        else
+        {
+            return new ExcludingBounds<RowPosition>(lastReturnedKey, bounds.right);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
new file mode 100644
index 0000000..578d5c9
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.StorageProxy;
+
+/**
+ * Pages a RangeSliceCommand whose predicate is a slice query.
+ *
+ * Note: this only work for CQL3 queries for now (because thrift queries expect
+ * a different limit on the rows than on the columns, which complicates it).
+ */
+public class RangeSliceQueryPager extends AbstractQueryPager
+{
+    private final RangeSliceCommand command;
+    private volatile RowPosition lastReturnedKey;
+    private volatile ByteBuffer lastReturnedName;
+
+    // Don't use directly, use QueryPagers method instead
+    RangeSliceQueryPager(RangeSliceCommand command, ConsistencyLevel consistencyLevel, boolean localQuery)
+    {
+        super(consistencyLevel, command.maxResults, localQuery, command.keyspace, command.columnFamily, command.predicate, command.timestamp);
+        this.command = command;
+        assert columnFilter instanceof SliceQueryFilter;
+    }
+
+    protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery)
+    throws RequestExecutionException
+    {
+        SliceQueryFilter sf = (SliceQueryFilter)columnFilter;
+        AbstractBounds<RowPosition> keyRange = lastReturnedKey == null ? command.keyRange : makeIncludingKeyBounds(lastReturnedKey);
+        ByteBuffer start = lastReturnedName == null ? sf.start() : lastReturnedName;
+        PagedRangeCommand pageCmd = new PagedRangeCommand(command.keyspace,
+                                                          command.columnFamily,
+                                                          command.timestamp,
+                                                          keyRange,
+                                                          sf,
+                                                          start,
+                                                          sf.finish(),
+                                                          command.rowFilter,
+                                                          pageSize);
+
+        return localQuery
+             ? pageCmd.executeLocally()
+             : StorageProxy.getRangeSlice(pageCmd, consistencyLevel);
+    }
+
+    protected boolean containsPreviousLast(Row first)
+    {
+        return lastReturnedKey != null
+            && lastReturnedKey.equals(first.key)
+            && lastReturnedName.equals(firstName(first.cf));
+    }
+
+    protected boolean recordLast(Row last)
+    {
+        lastReturnedKey = last.key;
+        lastReturnedName = lastName(last.cf);
+        return true;
+    }
+
+    private AbstractBounds<RowPosition> makeIncludingKeyBounds(RowPosition lastReturnedKey)
+    {
+        // We always include lastReturnedKey since we may still be paging within a row,
+        // and PagedRangeCommand will move over if we're not anyway
+        AbstractBounds<RowPosition> bounds = command.keyRange;
+        if (bounds instanceof Range || bounds instanceof Bounds)
+        {
+            return new Bounds<RowPosition>(lastReturnedKey, bounds.right);
+        }
+        else
+        {
+            return new IncludingExcludingBounds<RowPosition>(lastReturnedKey, bounds.right);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
new file mode 100644
index 0000000..693a20e
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import org.apache.cassandra.db.filter.ColumnCounter;
+
+/**
+ * Common interface to single partition queries (by slice and by name).
+ *
+ * For use by MultiPartitionPager.
+ */
+public interface SinglePartitionPager extends QueryPager
+{
+    public ColumnCounter columnCounter();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
new file mode 100644
index 0000000..58ef3c4
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.StorageProxy;
+
+/**
+ * Pager over a SliceFromReadCommand.
+ */
+public class SliceQueryPager extends AbstractQueryPager implements SinglePartitionPager
+{
+    private final SliceFromReadCommand command;
+
+    private volatile ByteBuffer lastReturned;
+
+    // Don't use directly, use QueryPagers method instead
+    SliceQueryPager(SliceFromReadCommand command, ConsistencyLevel consistencyLevel, boolean localQuery)
+    {
+        super(consistencyLevel, command.filter.count, localQuery, command.table, command.cfName, command.filter, command.timestamp);
+        this.command = command;
+    }
+
+    protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery)
+    throws RequestValidationException, RequestExecutionException
+    {
+        SliceQueryFilter filter = command.filter.withUpdatedCount(pageSize);
+        if (lastReturned != null)
+            filter = filter.withUpdatedStart(lastReturned, cfm.comparator);
+
+        ReadCommand pageCmd = command.withUpdatedFilter(filter);
+        return localQuery
+             ? Collections.singletonList(pageCmd.getRow(Table.open(command.table)))
+             : StorageProxy.read(Collections.singletonList(pageCmd), consistencyLevel);
+    }
+
+    protected boolean containsPreviousLast(Row first)
+    {
+        return lastReturned != null && lastReturned.equals(firstName(first.cf));
+    }
+
+    protected boolean recordLast(Row last)
+    {
+        lastReturned = lastName(last.cf);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 55688a8..a15927e 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -45,6 +45,7 @@ import org.apache.cassandra.cql.CQLStatement;
 import org.apache.cassandra.cql.QueryProcessor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
@@ -60,6 +61,7 @@ import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.pager.QueryPagers;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
@@ -368,31 +370,25 @@ public class CassandraServer implements Cassandra.Iface
         }
     }
 
-    private Map<ByteBuffer, List<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace,
-                                                                             List<ByteBuffer> keys,
-                                                                             ColumnParent column_parent,
-                                                                             long timestamp,
-                                                                             SlicePredicate predicate,
-                                                                             ConsistencyLevel consistency_level)
-    throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException
+    private SliceQueryFilter toInternalFilter(CFMetaData metadata, ColumnParent parent, SliceRange range)
     {
-        CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
-        ThriftValidation.validateColumnParent(metadata, column_parent);
-        ThriftValidation.validatePredicate(metadata, column_parent, predicate);
-
-        org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
-        consistencyLevel.validateForRead(keyspace);
+        SliceQueryFilter filter = new SliceQueryFilter(range.start, range.finish, range.reversed, range.count);
+        if (metadata.isSuper())
+            filter = SuperColumns.fromSCSliceFilter((CompositeType)metadata.comparator, parent.bufferForSuper_column(), filter);
+        return filter;
+    }
 
-        List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
+    private IDiskAtomFilter toInternalFilter(CFMetaData metadata, ColumnParent parent, SlicePredicate predicate)
+    {
         IDiskAtomFilter filter;
         if (predicate.column_names != null)
         {
             if (metadata.isSuper())
             {
                 CompositeType type = (CompositeType)metadata.comparator;
-                SortedSet s = new TreeSet<ByteBuffer>(column_parent.isSetSuper_column() ? type.types.get(1) : type.types.get(0));
+                SortedSet s = new TreeSet<ByteBuffer>(parent.isSetSuper_column() ? type.types.get(1) : type.types.get(0));
                 s.addAll(predicate.column_names);
-                filter = SuperColumns.fromSCNamesFilter(type, column_parent.bufferForSuper_column(), new NamesQueryFilter(s));
+                filter = SuperColumns.fromSCNamesFilter(type, parent.bufferForSuper_column(), new NamesQueryFilter(s));
             }
             else
             {
@@ -403,11 +399,28 @@ public class CassandraServer implements Cassandra.Iface
         }
         else
         {
-            SliceRange range = predicate.slice_range;
-            filter = new SliceQueryFilter(range.start, range.finish, range.reversed, range.count);
-            if (metadata.isSuper())
-                filter = SuperColumns.fromSCFilter((CompositeType)metadata.comparator, column_parent.bufferForSuper_column(), filter);
+            filter = toInternalFilter(metadata, parent, predicate.slice_range);
         }
+        return filter;
+    }
+
+    private Map<ByteBuffer, List<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace,
+                                                                             List<ByteBuffer> keys,
+                                                                             ColumnParent column_parent,
+                                                                             long timestamp,
+                                                                             SlicePredicate predicate,
+                                                                             ConsistencyLevel consistency_level)
+    throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException
+    {
+        CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
+        ThriftValidation.validateColumnParent(metadata, column_parent);
+        ThriftValidation.validatePredicate(metadata, column_parent, predicate);
+
+        org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
+        consistencyLevel.validateForRead(keyspace);
+
+        List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
+        IDiskAtomFilter filter = toInternalFilter(metadata, column_parent, predicate);
 
         for (ByteBuffer key: keys)
         {
@@ -530,46 +543,22 @@ public class CassandraServer implements Cassandra.Iface
                 pageSize = COUNT_PAGE_SIZE;
             }
 
-            int totalCount = 0;
-            List<ColumnOrSuperColumn> columns;
-
-            if (predicate.slice_range == null)
-            {
-                predicate.slice_range = new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                       ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                       false,
-                                                       Integer.MAX_VALUE);
-            }
-
-            final int requestedCount = predicate.slice_range.count;
-            int remaining = requestedCount;
-            int pages = 0;
-            while (true)
-            {
-                predicate.slice_range.count = Math.min(pageSize, Math.max(2, remaining)); // fetch at least two columns
-                columns = getSliceInternal(keyspace, key, column_parent, timestamp, predicate, consistency_level);
-                if (columns.isEmpty())
-                    break;
-
-                ByteBuffer firstName = getName(columns.get(0));
-                int newColumns = pages == 0 || !firstName.equals(predicate.slice_range.start) ? columns.size() : columns.size() - 1;
-
-                totalCount += newColumns;
-                // if we over-counted, just return original limit
-                if (totalCount > requestedCount)
-                    return requestedCount;
-                remaining -= newColumns;
-                pages++;
-                // We're done if either:
-                // - We've querying the number of columns requested by the user
-                // - last fetched page only contains the column we already fetched
-                if (remaining == 0 || ((columns.size() == 1) && (firstName.equals(predicate.slice_range.start))))
-                    break;
-                else
-                    predicate.slice_range.start = getName(columns.get(columns.size() - 1));
-            }
+            SliceRange sliceRange = predicate.slice_range == null
+                                  ? new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, Integer.MAX_VALUE)
+                                  : predicate.slice_range;
+            SliceQueryFilter filter = toInternalFilter(cfs.metadata, column_parent, sliceRange);
 
-            return totalCount;
+            return QueryPagers.countPaged(keyspace,
+                                          column_parent.column_family,
+                                          key,
+                                          filter,
+                                          ThriftConversion.fromThrift(consistency_level),
+                                          pageSize,
+                                          timestamp);
+        }
+        catch (RequestExecutionException e)
+        {
+            throw ThriftConversion.rethrow(e);
         }
         catch (RequestValidationException e)
         {
@@ -774,8 +763,7 @@ public class CassandraServer implements Cassandra.Iface
         }
         catch (RequestExecutionException e)
         {
-            ThriftConversion.rethrow(e);
-            return null; // makes javac happy -- it can't tell that rethrow always throws
+            throw ThriftConversion.rethrow(e);
         }
         finally
         {
@@ -1218,22 +1206,16 @@ public class CassandraServer implements Cassandra.Iface
                 bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), end);
             }
 
+            if (range.row_filter != null && !range.row_filter.isEmpty())
+                throw new InvalidRequestException("Cross-row paging is not supported along with index clauses");
+
             List<Row> rows;
             long now = System.currentTimeMillis();
             schedule(DatabaseDescriptor.getRangeRpcTimeout());
             try
             {
                 IDiskAtomFilter filter = ThriftValidation.asIFilter(predicate, metadata, null);
-                rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace,
-                                                                        column_family,
-                                                                        now,
-                                                                        filter,
-                                                                        bounds,
-                                                                        range.row_filter,
-                                                                        range.count,
-                                                                        true,
-                                                                        true),
-                                                  consistencyLevel);
+                rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_family, now, filter, bounds, null, range.count, true, true), consistencyLevel);
             }
             finally
             {
@@ -1878,8 +1860,7 @@ public class CassandraServer implements Cassandra.Iface
         }
         catch (RequestExecutionException e)
         {
-            ThriftConversion.rethrow(e);
-            return null;
+            throw ThriftConversion.rethrow(e);
         }
         catch (RequestValidationException e)
         {
@@ -1913,8 +1894,7 @@ public class CassandraServer implements Cassandra.Iface
         }
         catch (RequestExecutionException e)
         {
-            ThriftConversion.rethrow(e);
-            return null;
+            throw ThriftConversion.rethrow(e);
         }
         catch (RequestValidationException e)
         {
@@ -1994,8 +1974,7 @@ public class CassandraServer implements Cassandra.Iface
         }
         catch (RequestExecutionException e)
         {
-            ThriftConversion.rethrow(e);
-            return null;
+            throw ThriftConversion.rethrow(e);
         }
         catch (RequestValidationException e)
         {
@@ -2034,12 +2013,15 @@ public class CassandraServer implements Cassandra.Iface
                                                                 itemId, org.apache.cassandra.cql3.QueryProcessor.MAX_CACHE_PREPARED, itemId));
             logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, statement.getBoundsTerms());
 
-            return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement, ThriftConversion.fromThrift(cLevel), cState.getQueryState(), bindVariables).toThriftResult();
+            return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement,
+                                                                            ThriftConversion.fromThrift(cLevel),
+                                                                            cState.getQueryState(),
+                                                                            bindVariables,
+                                                                            -1).toThriftResult();
         }
         catch (RequestExecutionException e)
         {
-            ThriftConversion.rethrow(e);
-            return null;
+            throw ThriftConversion.rethrow(e);
         }
         catch (RequestValidationException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index 28725f0..8a9ab59 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -45,7 +45,9 @@ public class ThriftConversion
         throw new AssertionError();
     }
 
-    public static void rethrow(RequestExecutionException e) throws UnavailableException, TimedOutException
+    // We never return, but returning a RuntimeException allows to write "throw rethrow(e)" without java complaining
+    // for methods that have a return value.
+    public static RuntimeException rethrow(RequestExecutionException e) throws UnavailableException, TimedOutException
     {
         if (e instanceof RequestTimeoutException)
             throw toThrift((RequestTimeoutException)e);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/CBCodec.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBCodec.java b/src/java/org/apache/cassandra/transport/CBCodec.java
index 2250816..2ba21d5 100644
--- a/src/java/org/apache/cassandra/transport/CBCodec.java
+++ b/src/java/org/apache/cassandra/transport/CBCodec.java
@@ -22,5 +22,5 @@ import org.jboss.netty.buffer.ChannelBuffer;
 public interface CBCodec<T>
 {
     public T decode(ChannelBuffer body, int version);
-    public ChannelBuffer encode(T t);
+    public ChannelBuffer encode(T t, int version);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index 8e2d765..2f3f3bd 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -35,14 +35,7 @@ import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.transport.messages.CredentialsMessage;
-import org.apache.cassandra.transport.messages.ExecuteMessage;
-import org.apache.cassandra.transport.messages.OptionsMessage;
-import org.apache.cassandra.transport.messages.PrepareMessage;
-import org.apache.cassandra.transport.messages.QueryMessage;
-import org.apache.cassandra.transport.messages.RegisterMessage;
-import org.apache.cassandra.transport.messages.AuthResponse;
-import org.apache.cassandra.transport.messages.StartupMessage;
+import org.apache.cassandra.transport.messages.*;
 import org.apache.cassandra.utils.Hex;
 
 import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
@@ -116,8 +109,37 @@ public class Client extends SimpleClient
         }
         else if (msgType.equals("QUERY"))
         {
-            String query = line.substring(6);
-            return new QueryMessage(query, ConsistencyLevel.ONE);
+            line = line.substring(6);
+            // Ugly hack to allow setting a page size, but that's playground code anyway
+            String query = line;
+            int pageSize = -1;
+            if (line.matches(".+ !\\d+$"))
+            {
+                int idx = line.lastIndexOf('!');
+                query = line.substring(0, idx-1);
+                try
+                {
+                    pageSize = Integer.parseInt(line.substring(idx+1, line.length()));
+                }
+                catch (NumberFormatException e)
+                {
+                    return null;
+                }
+            }
+            return new QueryMessage(query, Collections.<ByteBuffer>emptyList(), ConsistencyLevel.ONE, pageSize);
+        }
+        else if (msgType.equals("NEXT"))
+        {
+            line = line.substring(5);
+            try
+            {
+                int pageSize = Integer.parseInt(line);
+                return new NextMessage(pageSize);
+            }
+            catch (NumberFormatException e)
+            {
+                return null;
+            }
         }
         else if (msgType.equals("PREPARE"))
         {
@@ -145,7 +167,7 @@ public class Client extends SimpleClient
                     }
                     values.add(bb);
                 }
-                return new ExecuteMessage(id, values, ConsistencyLevel.ONE);
+                return new ExecuteMessage(id, values, ConsistencyLevel.ONE, -1);
             }
             catch (Exception e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 54da6a2..eca3697 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -73,7 +73,8 @@ public abstract class Message
         BATCH          (13, Direction.REQUEST,  BatchMessage.codec),
         AUTH_CHALLENGE (14, Direction.RESPONSE, AuthChallenge.codec),
         AUTH_RESPONSE  (15, Direction.REQUEST,  AuthResponse.codec),
-        AUTH_SUCCESS   (16, Direction.RESPONSE, AuthSuccess.codec);
+        AUTH_SUCCESS   (16, Direction.RESPONSE, AuthSuccess.codec),
+        NEXT           (17, Direction.REQUEST,  NextMessage.codec);
 
         public final int opcode;
         public final Direction direction;
@@ -298,11 +299,11 @@ public abstract class Message
             {
                 assert request.connection() instanceof ServerConnection;
                 ServerConnection connection = (ServerConnection)request.connection();
-                connection.validateNewMessage(request.type, request.getVersion());
+                QueryState qstate = connection.validateNewMessage(request.type, request.getVersion(), request.getStreamId());
 
                 logger.debug("Received: {}, v={}", request, request.getVersion());
 
-                Response response = request.execute(connection.getQueryState(request.getStreamId()));
+                Response response = request.execute(qstate);
                 response.setStreamId(request.getStreamId());
                 response.setVersion(request.getVersion());
                 response.attach(connection);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/ServerConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/ServerConnection.java b/src/java/org/apache/cassandra/transport/ServerConnection.java
index ec99440..538258d 100644
--- a/src/java/org/apache/cassandra/transport/ServerConnection.java
+++ b/src/java/org/apache/cassandra/transport/ServerConnection.java
@@ -53,7 +53,7 @@ public class ServerConnection extends Connection
         this.state = State.UNINITIALIZED;
     }
 
-    public QueryState getQueryState(int streamId)
+    private QueryState getQueryState(int streamId)
     {
         QueryState qState = queryStates.get(streamId);
         if (qState == null)
@@ -66,7 +66,7 @@ public class ServerConnection extends Connection
         return qState;
     }
 
-    public void validateNewMessage(Message.Type type, int version)
+    public QueryState validateNewMessage(Message.Type type, int version, int streamId)
     {
         switch (state)
         {
@@ -86,6 +86,20 @@ public class ServerConnection extends Connection
             default:
                 throw new AssertionError();
         }
+
+        QueryState qstate = getQueryState(streamId);
+        if (qstate.hasPager())
+        {
+            if (type != Message.Type.NEXT)
+                qstate.dropPager();
+        }
+        else
+        {
+            if (type == Message.Type.NEXT)
+                throw new ProtocolException("Unexpected NEXT message, paging is not set (or is done)");
+        }
+
+        return qstate;
     }
 
     public void applyStateTransition(Message.Type requestType, Message.Type responseType)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 993a490..3a9c286 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -156,7 +156,7 @@ public class SimpleClient
 
     public ResultMessage execute(String query, List<ByteBuffer> values, ConsistencyLevel consistencyLevel)
     {
-        Message.Response msg = execute(new QueryMessage(query, values, consistencyLevel));
+        Message.Response msg = execute(new QueryMessage(query, values, consistencyLevel, -1));
         assert msg instanceof ResultMessage;
         return (ResultMessage)msg;
     }
@@ -170,7 +170,7 @@ public class SimpleClient
 
     public ResultMessage executePrepared(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency)
     {
-        Message.Response msg = execute(new ExecuteMessage(statementId, values, consistency));
+        Message.Response msg = execute(new ExecuteMessage(statementId, values, consistency, -1));
         assert msg instanceof ResultMessage;
         return (ResultMessage)msg;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java b/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
index bc90dc5..63df7d0 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
@@ -40,7 +40,7 @@ public class AuthChallenge extends Message.Response
         }
 
         @Override
-        public ChannelBuffer encode(AuthChallenge challenge)
+        public ChannelBuffer encode(AuthChallenge challenge, int version)
         {
             return CBUtil.valueToCB(challenge.token);
         }
@@ -57,7 +57,7 @@ public class AuthChallenge extends Message.Response
     @Override
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     public byte[] getToken()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
index 1f8ed9f..8a33a72 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
@@ -52,7 +52,7 @@ public class AuthResponse extends Message.Request
         }
 
         @Override
-        public ChannelBuffer encode(AuthResponse response)
+        public ChannelBuffer encode(AuthResponse response, int version)
         {
             return CBUtil.valueToCB(response.token);
         }
@@ -69,7 +69,7 @@ public class AuthResponse extends Message.Request
     @Override
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java b/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
index ba520bc..13c750a 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
@@ -43,7 +43,7 @@ public class AuthSuccess extends Message.Response
         }
 
         @Override
-        public ChannelBuffer encode(AuthSuccess success)
+        public ChannelBuffer encode(AuthSuccess success, int version)
         {
             return CBUtil.valueToCB(success.token);
         }
@@ -60,7 +60,7 @@ public class AuthSuccess extends Message.Response
     @Override
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     public byte[] getToken()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
index d781f68..292f748 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
@@ -35,7 +35,7 @@ public class AuthenticateMessage extends Message.Response
             return new AuthenticateMessage(authenticator);
         }
 
-        public ChannelBuffer encode(AuthenticateMessage msg)
+        public ChannelBuffer encode(AuthenticateMessage msg, int version)
         {
             return CBUtil.stringToCB(msg.authenticator);
         }
@@ -51,7 +51,7 @@ public class AuthenticateMessage extends Message.Response
 
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index 299d8b8..9fb4482 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -73,7 +73,7 @@ public class BatchMessage extends Message.Request
             return new BatchMessage(toType(type), queryOrIds, variables, consistency);
         }
 
-        public ChannelBuffer encode(BatchMessage msg)
+        public ChannelBuffer encode(BatchMessage msg, int version)
         {
             // We have:
             //   - type
@@ -160,7 +160,7 @@ public class BatchMessage extends Message.Request
 
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     public Message.Response execute(QueryState state)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
index ceff5ba..207907a 100644
--- a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
@@ -55,7 +55,7 @@ public class CredentialsMessage extends Message.Request
             return msg;
         }
 
-        public ChannelBuffer encode(CredentialsMessage msg)
+        public ChannelBuffer encode(CredentialsMessage msg, int version)
         {
             ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
 
@@ -78,7 +78,7 @@ public class CredentialsMessage extends Message.Request
 
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     public Message.Response execute(QueryState state)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index 3243bce..3675f08 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -123,7 +123,7 @@ public class ErrorMessage extends Message.Response
             return new ErrorMessage(te);
         }
 
-        public ChannelBuffer encode(ErrorMessage msg)
+        public ChannelBuffer encode(ErrorMessage msg, int version)
         {
             ChannelBuffer ccb = CBUtil.intToCB(msg.error.code().value);
             ChannelBuffer mcb = CBUtil.stringToCB(msg.error.getMessage());
@@ -213,7 +213,7 @@ public class ErrorMessage extends Message.Response
 
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/EventMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/EventMessage.java b/src/java/org/apache/cassandra/transport/messages/EventMessage.java
index 7d67de9..f7a93ae 100644
--- a/src/java/org/apache/cassandra/transport/messages/EventMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/EventMessage.java
@@ -31,7 +31,7 @@ public class EventMessage extends Message.Response
             return new EventMessage(Event.deserialize(body));
         }
 
-        public ChannelBuffer encode(EventMessage msg)
+        public ChannelBuffer encode(EventMessage msg, int version)
         {
             return msg.event.serialize();
         }
@@ -48,7 +48,7 @@ public class EventMessage extends Message.Response
 
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 8e2b761..7c35e42 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 
+import com.google.common.collect.ImmutableMap;
 import org.jboss.netty.buffer.ChannelBuffer;
 
 import org.apache.cassandra.cql3.CQLStatement;
@@ -49,10 +50,11 @@ public class ExecuteMessage extends Message.Request
                 values.add(CBUtil.readValue(body));
 
             ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
-            return new ExecuteMessage(id, values, consistency);
+            int resultPageSize = body.readInt();
+            return new ExecuteMessage(id, values, consistency, resultPageSize);
         }
 
-        public ChannelBuffer encode(ExecuteMessage msg)
+        public ChannelBuffer encode(ExecuteMessage msg, int version)
         {
             // We have:
             //   - statementId
@@ -60,7 +62,7 @@ public class ExecuteMessage extends Message.Request
             //   - The values
             //   - options
             int vs = msg.values.size();
-            CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(3, 0, vs);
+            CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(4, 0, vs);
             builder.add(CBUtil.bytesToCB(msg.statementId.bytes));
             builder.add(CBUtil.shortToCB(vs));
 
@@ -69,6 +71,7 @@ public class ExecuteMessage extends Message.Request
                 builder.addValue(value);
 
             builder.add(CBUtil.consistencyLevelToCB(msg.consistency));
+            builder.add(CBUtil.intToCB(msg.resultPageSize));
             return builder.build();
         }
     };
@@ -76,23 +79,25 @@ public class ExecuteMessage extends Message.Request
     public final MD5Digest statementId;
     public final List<ByteBuffer> values;
     public final ConsistencyLevel consistency;
+    public final int resultPageSize;
 
-    public ExecuteMessage(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency)
+    public ExecuteMessage(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency, int resultPageSize)
     {
-        this(MD5Digest.wrap(statementId), values, consistency);
+        this(MD5Digest.wrap(statementId), values, consistency, resultPageSize);
     }
 
-    public ExecuteMessage(MD5Digest statementId, List<ByteBuffer> values, ConsistencyLevel consistency)
+    public ExecuteMessage(MD5Digest statementId, List<ByteBuffer> values, ConsistencyLevel consistency, int resultPageSize)
     {
         super(Message.Type.EXECUTE);
         this.statementId = statementId;
         this.values = values;
         this.consistency = consistency;
+        this.resultPageSize = resultPageSize;
     }
 
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     public Message.Response execute(QueryState state)
@@ -104,6 +109,9 @@ public class ExecuteMessage extends Message.Request
             if (statement == null)
                 throw new PreparedQueryNotFoundException(statementId);
 
+            if (resultPageSize == 0)
+                throw new ProtocolException("The page size cannot be 0");
+
             UUID tracingId = null;
             if (isTracingRequested())
             {
@@ -114,11 +122,16 @@ public class ExecuteMessage extends Message.Request
             if (state.traceNextQuery())
             {
                 state.createTracingSession();
+
+                ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+                if (resultPageSize > 0)
+                    builder.put("page_size", Integer.toString(resultPageSize));
+
                 // TODO we don't have [typed] access to CQL bind variables here.  CASSANDRA-4560 is open to add support.
-                Tracing.instance.begin("Execute CQL3 prepared query", Collections.<String, String>emptyMap());
+                Tracing.instance.begin("Execute CQL3 prepared query", builder.build());
             }
 
-            Message.Response response = QueryProcessor.processPrepared(statement, consistency, state, values);
+            Message.Response response = QueryProcessor.processPrepared(statement, consistency, state, values, resultPageSize);
 
             if (tracingId != null)
                 response.setTracingId(tracingId);
@@ -132,6 +145,9 @@ public class ExecuteMessage extends Message.Request
         finally
         {
             Tracing.instance.stopSession();
+            // Trash the current session id if we won't need it anymore
+            if (!state.hasPager())
+                state.getAndResetCurrentTracingSession();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/NextMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/NextMessage.java b/src/java/org/apache/cassandra/transport/messages/NextMessage.java
new file mode 100644
index 0000000..d68f603
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/NextMessage.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import java.util.UUID;
+
+import com.google.common.collect.ImmutableMap;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.*;
+import org.apache.cassandra.utils.UUIDGen;
+
+public class NextMessage extends Message.Request
+{
+    public static final Message.Codec<NextMessage> codec = new Message.Codec<NextMessage>()
+    {
+        public NextMessage decode(ChannelBuffer body, int version)
+        {
+            int resultPageSize = body.readInt();
+            return new NextMessage(resultPageSize);
+        }
+
+        public ChannelBuffer encode(NextMessage msg, int version)
+        {
+            return CBUtil.intToCB(msg.resultPageSize);
+        }
+    };
+
+    public final int resultPageSize;
+
+    public NextMessage(int resultPageSize)
+    {
+        super(Type.NEXT);
+        this.resultPageSize = resultPageSize;
+    }
+
+    public ChannelBuffer encode()
+    {
+        return codec.encode(this, getVersion());
+    }
+
+    public Message.Response execute(QueryState state)
+    {
+        try
+        {
+            if (resultPageSize == 0)
+                throw new ProtocolException("The page size cannot be 0");
+
+            /*
+             * If we had traced the previous page and we are asked to trace this one,
+             * record the previous id to allow linking the trace together.
+             */
+            UUID previousTracingId = state.getAndResetCurrentTracingSession();
+
+            UUID tracingId = null;
+            if (isTracingRequested())
+            {
+                tracingId = UUIDGen.getTimeUUID();
+                state.prepareTracingSession(tracingId);
+            }
+
+            if (state.traceNextQuery())
+            {
+                state.createTracingSession();
+                ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+                if (resultPageSize > 0)
+                    builder.put("page_size", Integer.toString(resultPageSize));
+                if (previousTracingId != null)
+                    builder.put("previous_trace", previousTracingId.toString());
+                Tracing.instance.begin("Continue paged CQL3 query", builder.build());
+            }
+
+            Message.Response response = state.getNextPage(resultPageSize < 0 ? Integer.MAX_VALUE : resultPageSize);
+
+            if (tracingId != null)
+                response.setTracingId(tracingId);
+
+            return response;
+        }
+        catch (Exception e)
+        {
+            if (!((e instanceof RequestValidationException) || (e instanceof RequestExecutionException)))
+                logger.error("Unexpected error during query", e);
+            return ErrorMessage.fromException(e);
+        }
+        finally
+        {
+            Tracing.instance.stopSession();
+            // Trash the current session id if we won't need it anymore
+            if (!state.hasPager())
+                state.getAndResetCurrentTracingSession();
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        return "NEXT " + resultPageSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
index 6e753d3..5afefb5 100644
--- a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
@@ -42,7 +42,7 @@ public class OptionsMessage extends Message.Request
             return new OptionsMessage();
         }
 
-        public ChannelBuffer encode(OptionsMessage msg)
+        public ChannelBuffer encode(OptionsMessage msg, int version)
         {
             return ChannelBuffers.EMPTY_BUFFER;
         }
@@ -55,7 +55,7 @@ public class OptionsMessage extends Message.Request
 
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     public Message.Response execute(QueryState state)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
index 3e7fe51..851f3f8 100644
--- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
@@ -38,7 +38,7 @@ public class PrepareMessage extends Message.Request
             return new PrepareMessage(query);
         }
 
-        public ChannelBuffer encode(PrepareMessage msg)
+        public ChannelBuffer encode(PrepareMessage msg, int version)
         {
             return CBUtil.longStringToCB(msg.query);
         }
@@ -54,7 +54,7 @@ public class PrepareMessage extends Message.Request
 
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     public Message.Response execute(QueryState state)


[3/4] Add auto paging capability to the native protocol

Posted by sl...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/RowIteratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIteratorFactory.java b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
index c9f715c..c02da1d 100644
--- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java
+++ b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
@@ -45,18 +45,15 @@ public class RowIteratorFactory
      * and filtered by the queryfilter.
      * @param memtables Memtables pending flush.
      * @param sstables SStables to scan through.
-     * @param startWith Start at this key
-     * @param stopAt Stop and this key
-     * @param filter Used to decide which columns to pull out
+     * @param range The data range to fetch
      * @param cfs
      * @return A row iterator following all the given restrictions
      */
     public static CloseableIterator<Row> getIterator(final Iterable<Memtable> memtables,
                                                      final Collection<SSTableReader> sstables,
-                                                     final RowPosition startWith,
-                                                     final RowPosition stopAt,
-                                                     final QueryFilter filter,
-                                                     final ColumnFamilyStore cfs)
+                                                     final DataRange range,
+                                                     final ColumnFamilyStore cfs,
+                                                     final long now)
     {
         // fetch data from current memtable, historical memtables, and SSTables in the correct order.
         final List<CloseableIterator<OnDiskAtomIterator>> iterators = new ArrayList<CloseableIterator<OnDiskAtomIterator>>();
@@ -64,19 +61,19 @@ public class RowIteratorFactory
         // memtables
         for (Memtable memtable : memtables)
         {
-            iterators.add(new ConvertToColumnIterator<AtomicSortedColumns>(filter, memtable.getEntryIterator(startWith, stopAt)));
+            iterators.add(new ConvertToColumnIterator<AtomicSortedColumns>(range, memtable.getEntryIterator(range.startKey(), range.stopKey())));
         }
 
         for (SSTableReader sstable : sstables)
         {
-            final SSTableScanner scanner = sstable.getScanner(filter, startWith);
+            final SSTableScanner scanner = sstable.getScanner(range);
             iterators.add(scanner);
         }
 
         // reduce rows from all sources into a single row
         return MergeIterator.get(iterators, COMPARE_BY_KEY, new MergeIterator.Reducer<OnDiskAtomIterator, Row>()
         {
-            private final int gcBefore = cfs.gcBefore(filter.timestamp);
+            private final int gcBefore = cfs.gcBefore(now);
             private final List<OnDiskAtomIterator> colIters = new ArrayList<OnDiskAtomIterator>();
             private DecoratedKey key;
             private ColumnFamily returnCF;
@@ -96,17 +93,16 @@ public class RowIteratorFactory
 
             protected Row getReduced()
             {
-
                 // First check if this row is in the rowCache. If it is we can skip the rest
                 ColumnFamily cached = cfs.getRawCachedRow(key);
                 if (cached == null)
                 {
                     // not cached: collate
-                    filter.collateOnDiskAtom(returnCF, colIters, gcBefore);
+                    QueryFilter.collateOnDiskAtom(returnCF, colIters, range.columnFilter(key.key), gcBefore, now);
                 }
                 else
                 {
-                    QueryFilter keyFilter = new QueryFilter(key, filter.cfName, filter.filter, filter.timestamp);
+                    QueryFilter keyFilter = new QueryFilter(key, cfs.name, range.columnFilter(key.key), now);
                     returnCF = cfs.filterColumnFamily(cached, keyFilter);
                 }
 
@@ -123,12 +119,12 @@ public class RowIteratorFactory
      */
     private static class ConvertToColumnIterator<T extends ColumnFamily> implements CloseableIterator<OnDiskAtomIterator>
     {
-        private final QueryFilter filter;
+        private final DataRange range;
         private final Iterator<Map.Entry<DecoratedKey, T>> iter;
 
-        public ConvertToColumnIterator(QueryFilter filter, Iterator<Map.Entry<DecoratedKey, T>> iter)
+        public ConvertToColumnIterator(DataRange range, Iterator<Map.Entry<DecoratedKey, T>> iter)
         {
-            this.filter = filter;
+            this.range = range;
             this.iter = iter;
         }
 
@@ -151,7 +147,7 @@ public class RowIteratorFactory
             {
                 public OnDiskAtomIterator create()
                 {
-                    return filter.getMemtableColumnIterator(entry.getValue(), entry.getKey());
+                    return range.columnFilter(entry.getKey().key).getColumnFamilyIterator(entry.getKey(), entry.getValue());
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index 5c42de5..508d1d2 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -106,6 +106,11 @@ public class SliceFromReadCommand extends ReadCommand
         return filter;
     }
 
+    public SliceFromReadCommand withUpdatedFilter(SliceQueryFilter newFilter)
+    {
+        return new SliceFromReadCommand(table, key, cfName, timestamp, newFilter);
+    }
+
     /**
      * The original number of columns requested by the user.
      * This can be different from count when the slice command is a retry (see

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceQueryPager.java b/src/java/org/apache/cassandra/db/SliceQueryPager.java
deleted file mode 100644
index c1933ad..0000000
--- a/src/java/org/apache/cassandra/db/SliceQueryPager.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.util.Arrays;
-import java.util.Iterator;
-
-import org.apache.cassandra.db.filter.*;
-
-public class SliceQueryPager implements Iterator<ColumnFamily>
-{
-    public static final int DEFAULT_PAGE_SIZE = 10000;
-
-    public final ColumnFamilyStore cfs;
-    public final DecoratedKey key;
-
-    private ColumnSlice[] slices;
-    private boolean exhausted;
-
-    public SliceQueryPager(ColumnFamilyStore cfs, DecoratedKey key, ColumnSlice[] slices)
-    {
-        this.cfs = cfs;
-        this.key = key;
-        this.slices = slices;
-    }
-
-    // This will *not* do a query
-    public boolean hasNext()
-    {
-        return !exhausted;
-    }
-
-    // This might return an empty column family (but never a null one)
-    public ColumnFamily next()
-    {
-        if (exhausted)
-            return null;
-
-        long now = System.currentTimeMillis();
-        SliceQueryFilter sliceFilter = new SliceQueryFilter(slices, false, DEFAULT_PAGE_SIZE);
-        QueryFilter filter = new QueryFilter(key, cfs.name, sliceFilter, now);
-        ColumnFamily cf = cfs.getColumnFamily(filter);
-        if (cf == null || sliceFilter.getLiveCount(cf, now) < DEFAULT_PAGE_SIZE)
-        {
-            exhausted = true;
-        }
-        else
-        {
-            Iterator<Column> iter = cf.getReverseSortedColumns().iterator();
-            Column lastColumn = iter.next();
-            while (lastColumn.isMarkedForDelete(now))
-                lastColumn = iter.next();
-
-            int i = 0;
-            for (; i < slices.length; ++i)
-            {
-                ColumnSlice current = slices[i];
-                if (cfs.getComparator().compare(lastColumn.name(), current.finish) <= 0)
-                    break;
-            }
-            if (i >= slices.length)
-                exhausted = true;
-            else
-                slices = Arrays.copyOfRange(slices, i, slices.length);
-        }
-        return cf == null ? EmptyColumns.factory.create(cfs.metadata) : cf;
-    }
-
-    public void remove()
-    {
-        throw new UnsupportedOperationException();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
index 90bfd8d..8528515 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -678,7 +678,8 @@ public class SystemTable
         return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(), minToken.maxKeyBound()),
                                                      null,
                                                      new IdentityQueryFilter(),
-                                                     Integer.MAX_VALUE);
+                                                     Integer.MAX_VALUE,
+                                                     System.currentTimeMillis());
     }
 
     public static Collection<RowMutation> serializeSchema()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index 772f842..409076f 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.pager.QueryPagers;
 import org.apache.cassandra.tracing.Tracing;
 
 /**
@@ -50,6 +51,7 @@ import org.apache.cassandra.tracing.Tracing;
 public class Table
 {
     public static final String SYSTEM_KS = "system";
+    private static final int DEFAULT_PAGE_SIZE = 10000;
 
     private static final Logger logger = LoggerFactory.getLogger(Table.class);
 
@@ -398,7 +400,7 @@ public class Table
         switchLock.readLock().lock();
         try
         {
-            SliceQueryPager pager = new SliceQueryPager(cfs, key, ColumnSlice.ALL_COLUMNS_ARRAY);
+            Iterator<ColumnFamily> pager = QueryPagers.pageRowLocally(cfs, key.key, DEFAULT_PAGE_SIZE);
             while (pager.hasNext())
             {
                 ColumnFamily cf = pager.next();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
index a5d54ce..2fda715 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
@@ -61,6 +61,16 @@ public class ColumnCounter
         return ignored;
     }
 
+    public ColumnCounter countAll(ColumnFamily container)
+    {
+        if (container == null)
+            return this;
+
+        for (Column c : container)
+            count(c, container);
+        return this;
+    }
+
     public static class GroupByPrefix extends ColumnCounter
     {
         private final CompositeType type;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
index af7431c..49503b5 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
@@ -89,6 +89,12 @@ public class ColumnSlice
         return cmp.compare(start, name) <= 0 && (finish.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER) || cmp.compare(finish, name) >= 0);
     }
 
+    public boolean isBefore(Comparator<ByteBuffer> cmp, ByteBuffer name)
+    {
+        return !finish.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER) && cmp.compare(finish, name) < 0;
+    }
+
+
     @Override
     public final int hashCode()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index d061ac2..e0f3ec3 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.db.filter;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.List;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -32,6 +33,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -45,47 +47,36 @@ public abstract class ExtendedFilter
 
     public final ColumnFamilyStore cfs;
     public final long timestamp;
-    protected final IDiskAtomFilter originalFilter;
+    public final DataRange dataRange;
     private final int maxResults;
     private final boolean countCQL3Rows;
-    private final boolean isPaging;
+    private volatile int currentLimit;
 
     public static ExtendedFilter create(ColumnFamilyStore cfs,
+                                        DataRange dataRange,
                                         List<IndexExpression> clause,
-                                        IDiskAtomFilter filter,
                                         int maxResults,
-                                        long timestamp,
                                         boolean countCQL3Rows,
-                                        boolean isPaging)
+                                        long timestamp)
     {
         if (clause == null || clause.isEmpty())
-        {
-            return new EmptyClauseFilter(cfs, filter, maxResults, timestamp, countCQL3Rows, isPaging);
-        }
-        else
-        {
-            if (isPaging)
-                throw new IllegalArgumentException("Cross-row paging is not supported along with index clauses");
-            return cfs.getComparator() instanceof CompositeType
-                 ? new FilterWithCompositeClauses(cfs, clause, filter, maxResults, timestamp, countCQL3Rows)
-                 : new FilterWithClauses(cfs, clause, filter, maxResults, timestamp, countCQL3Rows);
-        }
+            return new EmptyClauseFilter(cfs, dataRange, maxResults, countCQL3Rows, timestamp);
+
+        return new WithClauses(cfs, dataRange, clause, maxResults, countCQL3Rows, timestamp);
     }
 
-    protected ExtendedFilter(ColumnFamilyStore cfs, IDiskAtomFilter filter, int maxResults, long timestamp, boolean countCQL3Rows, boolean isPaging)
+    protected ExtendedFilter(ColumnFamilyStore cfs, DataRange dataRange, int maxResults, boolean countCQL3Rows, long timestamp)
     {
         assert cfs != null;
-        assert filter != null;
+        assert dataRange != null;
         this.cfs = cfs;
-        this.originalFilter = filter;
+        this.dataRange = dataRange;
         this.maxResults = maxResults;
         this.timestamp = timestamp;
         this.countCQL3Rows = countCQL3Rows;
-        this.isPaging = isPaging;
+        this.currentLimit = maxResults;
         if (countCQL3Rows)
-            originalFilter.updateColumnsLimit(maxResults);
-        if (isPaging && (!(originalFilter instanceof SliceQueryFilter) || ((SliceQueryFilter)originalFilter).finish().remaining() != 0))
-            throw new IllegalArgumentException("Cross-row paging is only supported for SliceQueryFilter having an empty finish column");
+            dataRange.updateColumnsLimit(maxResults);
     }
 
     public int maxRows()
@@ -98,37 +89,30 @@ public abstract class ExtendedFilter
         return countCQL3Rows ? maxResults : Integer.MAX_VALUE;
     }
 
-    /**
-     * Update the filter if necessary given the number of column already
-     * fetched.
-     */
-    public void updateFilter(int currentColumnsCount)
+    public int currentLimit()
     {
-        // As soon as we'd done our first call, we want to reset the start column if we're paging
-        if (isPaging)
-            ((SliceQueryFilter)initialFilter()).setStart(ByteBufferUtil.EMPTY_BYTE_BUFFER);
-
-        if (!countCQL3Rows)
-            return;
+        return currentLimit;
+    }
 
-        int remaining = maxResults - currentColumnsCount;
-        initialFilter().updateColumnsLimit(remaining);
+    public IDiskAtomFilter columnFilter(ByteBuffer key)
+    {
+        return dataRange.columnFilter(key);
     }
 
     public int lastCounted(ColumnFamily data)
     {
-        if (initialFilter() instanceof SliceQueryFilter)
-            return ((SliceQueryFilter)initialFilter()).lastCounted();
-        else
-            return initialFilter().getLiveCount(data, timestamp);
+        return dataRange.getLiveCount(data, timestamp);
     }
 
-    /** The initial filter we'll do our first slice with (either the original or a superset of it) */
-    public abstract IDiskAtomFilter initialFilter();
-
-    public IDiskAtomFilter originalFilter()
+    public void updateFilter(int currentColumnsCount)
     {
-        return originalFilter;
+        if (!countCQL3Rows)
+            return;
+
+        currentLimit = maxResults - currentColumnsCount;
+        // We propagate that limit to the underlying filter so each internal query don't
+        // fetch more than we needs it to.
+        dataRange.updateColumnsLimit(currentLimit);
     }
 
     public abstract List<IndexExpression> getClause();
@@ -138,18 +122,18 @@ public abstract class ExtendedFilter
      * @param data the data retrieve by the initial filter
      * @return a filter or null if there can't be any columns we missed with our initial filter (typically if it was a names query, or a slice of the entire row)
      */
-    public abstract IDiskAtomFilter getExtraFilter(ColumnFamily data);
+    public abstract IDiskAtomFilter getExtraFilter(DecoratedKey key, ColumnFamily data);
 
     /**
      * @return data pruned down to the columns originally asked for
      */
-    public abstract ColumnFamily prune(ColumnFamily data);
+    public abstract ColumnFamily prune(DecoratedKey key, ColumnFamily data);
 
     /**
      * @return true if the provided data satisfies all the expressions from
      * the clause of this filter.
      */
-    public abstract boolean isSatisfiedBy(ByteBuffer rowKey, ColumnFamily data, ColumnNameBuilder builder);
+    public abstract boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder);
 
     public static boolean satisfies(int comparison, IndexOperator op)
     {
@@ -170,44 +154,54 @@ public abstract class ExtendedFilter
         }
     }
 
-    private static class FilterWithClauses extends ExtendedFilter
+    public static class WithClauses extends ExtendedFilter
     {
-        protected final List<IndexExpression> clause;
-        protected final IDiskAtomFilter initialFilter;
-
-        public FilterWithClauses(ColumnFamilyStore cfs,
-                                 List<IndexExpression> clause,
-                                 IDiskAtomFilter filter,
-                                 int maxResults,
-                                 long timestamp,
-                                 boolean countCQL3Rows)
+        private final List<IndexExpression> clause;
+        private final IDiskAtomFilter optimizedFilter;
+
+        public WithClauses(ColumnFamilyStore cfs,
+                           DataRange range,
+                           List<IndexExpression> clause,
+                           int maxResults,
+                           boolean countCQL3Rows,
+                           long timestamp)
         {
-            super(cfs, filter, maxResults, timestamp, countCQL3Rows, false);
+            super(cfs, range, maxResults, countCQL3Rows, timestamp);
             assert clause != null;
             this.clause = clause;
-            this.initialFilter = computeInitialFilter();
+            this.optimizedFilter = computeOptimizedFilter();
         }
 
-        /** Sets up the initial filter. */
-        protected IDiskAtomFilter computeInitialFilter()
+        /*
+         * Potentially optimize the column filter if we have a change to make it catch all clauses
+         * right away.
+         */
+        private IDiskAtomFilter computeOptimizedFilter()
         {
-            if (originalFilter instanceof SliceQueryFilter)
+            /*
+             * We shouldn't do the "optimization" for composites as the index names are not valid column names 
+             * (which the rest of the method assumes). Said optimization is not useful for composites anyway.
+             * We also don't want to do for paging ranges as the actual filter depends on the row key (it would
+             * probably be possible to make it work but we won't really use it so we don't bother).
+             */
+            if (cfs.getComparator() instanceof CompositeType || dataRange instanceof DataRange.Paging)
+                return null;
+
+            IDiskAtomFilter filter = dataRange.columnFilter(null); // ok since not a paging range
+            if (filter instanceof SliceQueryFilter)
             {
                 // if we have a high chance of getting all the columns in a single index slice (and it's not too costly), do that.
                 // otherwise, the extraFilter (lazily created) will fetch by name the columns referenced by the additional expressions.
                 if (cfs.getMaxRowSize() < DatabaseDescriptor.getColumnIndexSize())
                 {
                     logger.trace("Expanding slice filter to entire row to cover additional expressions");
-                    return new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                ((SliceQueryFilter) originalFilter).reversed,
-                                                Integer.MAX_VALUE);
+                    return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, ((SliceQueryFilter)filter).reversed, Integer.MAX_VALUE);
                 }
             }
             else
             {
                 logger.trace("adding columns to original Filter to cover additional expressions");
-                assert originalFilter instanceof NamesQueryFilter;
+                assert filter instanceof NamesQueryFilter;
                 if (!clause.isEmpty())
                 {
                     SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(cfs.getComparator());
@@ -215,16 +209,17 @@ public abstract class ExtendedFilter
                     {
                         columns.add(expr.column_name);
                     }
-                    columns.addAll(((NamesQueryFilter) originalFilter).columns);
-                    return ((NamesQueryFilter)originalFilter).withUpdatedColumns(columns);
+                    columns.addAll(((NamesQueryFilter) filter).columns);
+                    return ((NamesQueryFilter) filter).withUpdatedColumns(columns);
                 }
             }
-            return originalFilter;
+            return null;
         }
 
-        public IDiskAtomFilter initialFilter()
+        @Override
+        public IDiskAtomFilter columnFilter(ByteBuffer key)
         {
-            return initialFilter;
+            return optimizedFilter == null ? dataRange.columnFilter(key) : optimizedFilter;
         }
 
         public List<IndexExpression> getClause()
@@ -233,21 +228,13 @@ public abstract class ExtendedFilter
         }
 
         /*
-         * We may need an extra query only if the original was a slice query (and thus may have miss the expression for the clause).
-         * Even then, there is no point in doing an extra query if the original filter grabbed the whole row.
-         * Lastly, we only need the extra query if we haven't yet got all the expressions from the clause.
+         * We may need an extra query only if the original query wasn't selecting the row entirely.
+         * Furthermore, we only need the extra query if we haven't yet got all the expressions from the clause.
          */
-        private boolean needsExtraQuery(ColumnFamily data)
+        private boolean needsExtraQuery(ByteBuffer rowKey, ColumnFamily data)
         {
-            if (!(originalFilter instanceof SliceQueryFilter))
-                return false;
-
-            SliceQueryFilter filter = (SliceQueryFilter)originalFilter;
-            // Check if we've fetch the whole row
-            if (filter.slices.length == 1
-             && filter.start().equals(ByteBufferUtil.EMPTY_BYTE_BUFFER)
-             && filter.finish().equals(ByteBufferUtil.EMPTY_BYTE_BUFFER)
-             && filter.count == Integer.MAX_VALUE)
+            IDiskAtomFilter filter = columnFilter(rowKey);
+            if (filter instanceof SliceQueryFilter && DataRange.isFullRowSlice((SliceQueryFilter)filter))
                 return false;
 
             for (IndexExpression expr : clause)
@@ -261,9 +248,18 @@ public abstract class ExtendedFilter
             return false;
         }
 
-        public IDiskAtomFilter getExtraFilter(ColumnFamily data)
+        public IDiskAtomFilter getExtraFilter(DecoratedKey rowKey, ColumnFamily data)
         {
-            if (!needsExtraQuery(data))
+            /*
+             * This method assumes the IndexExpression names are valid column names, which is not the
+             * case with composites. This is ok for now however since:
+             * 1) CompositeSearcher doesn't use it.
+             * 2) We don't yet allow non-indexed range slice with filters in CQL3 (i.e. this will never be
+             * called by CFS.filter() for composites).
+             */
+            assert !(cfs.getComparator() instanceof CompositeType);
+
+            if (!needsExtraQuery(rowKey.key, data))
                 return null;
 
             // Note: for counters we must be careful to not add a column that was already there (to avoid overcount). That is
@@ -278,17 +274,19 @@ public abstract class ExtendedFilter
             return new NamesQueryFilter(columns);
         }
 
-        public ColumnFamily prune(ColumnFamily data)
+        public ColumnFamily prune(DecoratedKey rowKey, ColumnFamily data)
         {
-            if (initialFilter == originalFilter)
+            if (optimizedFilter == null)
                 return data;
+
             ColumnFamily pruned = data.cloneMeShallow();
-            OnDiskAtomIterator iter = originalFilter.getMemtableColumnIterator(data, null);
-            originalFilter.collectReducedColumns(pruned, QueryFilter.gatherTombstones(pruned, iter), cfs.gcBefore(timestamp), timestamp);
+            IDiskAtomFilter filter = dataRange.columnFilter(rowKey.key);
+            OnDiskAtomIterator iter = filter.getColumnFamilyIterator(rowKey, data);
+            filter.collectReducedColumns(pruned, QueryFilter.gatherTombstones(pruned, iter), cfs.gcBefore(timestamp), timestamp);
             return pruned;
         }
 
-        public boolean isSatisfiedBy(ByteBuffer rowKey, ColumnFamily data, ColumnNameBuilder builder)
+        public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder)
         {
             // We enforces even the primary clause because reads are not synchronized with writes and it is thus possible to have a race
             // where the index returned a row which doesn't have the primary column when we actually read it
@@ -310,7 +308,7 @@ public abstract class ExtendedFilter
                 }
                 else
                 {
-                    dataValue = extractDataValue(def, rowKey, data, builder);
+                    dataValue = extractDataValue(def, rowKey.key, data, builder);
                     validator = def.getValidator();
                 }
 
@@ -346,68 +344,29 @@ public abstract class ExtendedFilter
         }
     }
 
-    private static class FilterWithCompositeClauses extends FilterWithClauses
-    {
-        public FilterWithCompositeClauses(ColumnFamilyStore cfs,
-                                          List<IndexExpression> clause,
-                                          IDiskAtomFilter filter,
-                                          int maxResults,
-                                          long timestamp,
-                                          boolean countCQL3Rows)
-        {
-            super(cfs, clause, filter, maxResults, timestamp, countCQL3Rows);
-        }
-
-        /*
-         * For composites, the index name is not a valid column name (it's only
-         * one of the component), which means we should not do the
-         * NamesQueryFilter part of FilterWithClauses in particular.
-         * Besides, CompositesSearcher doesn't really use the initial filter
-         * expect to know the limit set by the user, so create a fake filter
-         * with only the count information.
-         */
-        protected IDiskAtomFilter computeInitialFilter()
-        {
-            int limit = originalFilter instanceof SliceQueryFilter
-                      ? ((SliceQueryFilter)originalFilter).count
-                      : Integer.MAX_VALUE;
-            return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, limit);
-        }
-    }
-
     private static class EmptyClauseFilter extends ExtendedFilter
     {
-        public EmptyClauseFilter(ColumnFamilyStore cfs,
-                                 IDiskAtomFilter filter,
-                                 int maxResults,
-                                 long timestamp,
-                                 boolean countCQL3Rows,
-                                 boolean isPaging)
-        {
-            super(cfs, filter, maxResults, timestamp, countCQL3Rows, isPaging);
-        }
-
-        public IDiskAtomFilter initialFilter()
+        public EmptyClauseFilter(ColumnFamilyStore cfs, DataRange range, int maxResults, boolean countCQL3Rows, long timestamp)
         {
-            return originalFilter;
+            super(cfs, range, maxResults, countCQL3Rows, timestamp);
         }
 
         public List<IndexExpression> getClause()
         {
-            throw new UnsupportedOperationException();
+            return Collections.<IndexExpression>emptyList();
         }
 
-        public IDiskAtomFilter getExtraFilter(ColumnFamily data)
+        public IDiskAtomFilter getExtraFilter(DecoratedKey key, ColumnFamily data)
         {
             return null;
         }
 
-        public ColumnFamily prune(ColumnFamily data)
+        public ColumnFamily prune(DecoratedKey rowKey, ColumnFamily data)
         {
             return data;
         }
 
-        public boolean isSatisfiedBy(ByteBuffer rowKey, ColumnFamily data, ColumnNameBuilder builder)
+        public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder)
         {
             return true;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
index 35f71e5..69a8950 100644
--- a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
@@ -41,10 +41,10 @@ import org.apache.cassandra.io.util.FileDataInput;
 public interface IDiskAtomFilter
 {
     /**
-     * returns an iterator that returns columns from the given memtable
+     * returns an iterator that returns columns from the given columnFamily
      * matching the Filter criteria in sorted order.
      */
-    public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key);
+    public OnDiskAtomIterator getColumnFamilyIterator(DecoratedKey key, ColumnFamily cf);
 
     /**
      * Get an iterator that returns columns from the given SSTable using the opened file
@@ -74,6 +74,7 @@ public interface IDiskAtomFilter
     public void updateColumnsLimit(int newLimit);
 
     public int getLiveCount(ColumnFamily cf, long now);
+    public ColumnCounter columnCounter(AbstractType<?> comparator, long now);
 
     public IDiskAtomFilter cloneShallow();
     public boolean maySelectPrefix(Comparator<ByteBuffer> cmp, ByteBuffer prefix);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
index 297f227..d3f057d 100644
--- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
@@ -27,6 +27,7 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 
 import org.apache.commons.lang.StringUtils;
+import com.google.common.collect.AbstractIterator;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
@@ -75,9 +76,10 @@ public class NamesQueryFilter implements IDiskAtomFilter
        return new NamesQueryFilter(newColumns, countCQL3Rows);
     }
 
-    public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
+    public OnDiskAtomIterator getColumnFamilyIterator(DecoratedKey key, ColumnFamily cf)
     {
-        return Memtable.getNamesIterator(key, cf, this);
+        assert cf != null;
+        return new ByNameColumnIterator(columns.iterator(), cf, key);
     }
 
     public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key)
@@ -120,6 +122,8 @@ public class NamesQueryFilter implements IDiskAtomFilter
 
     public int getLiveCount(ColumnFamily cf, long now)
     {
+        // Note: we could use columnCounter() but we save the object allocation as it's simple enough
+
         if (countCQL3Rows)
             return cf.hasOnlyTombstones(now) ? 0 : 1;
 
@@ -147,6 +151,56 @@ public class NamesQueryFilter implements IDiskAtomFilter
         return true;
     }
 
+    public boolean countCQL3Rows()
+    {
+        return countCQL3Rows;
+    }
+
+    public ColumnCounter columnCounter(AbstractType<?> comparator, long now)
+    {
+        return countCQL3Rows
+             ? new ColumnCounter.GroupByPrefix(now, null, 0)
+             : new ColumnCounter(now);
+    }
+
+    private static class ByNameColumnIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
+    {
+        private final ColumnFamily cf;
+        private final DecoratedKey key;
+        private final Iterator<ByteBuffer> iter;
+
+        public ByNameColumnIterator(Iterator<ByteBuffer> iter, ColumnFamily cf, DecoratedKey key)
+        {
+            this.iter = iter;
+            this.cf = cf;
+            this.key = key;
+        }
+
+        public ColumnFamily getColumnFamily()
+        {
+            return cf;
+        }
+
+        public DecoratedKey getKey()
+        {
+            return key;
+        }
+
+        protected OnDiskAtom computeNext()
+        {
+            while (iter.hasNext())
+            {
+                ByteBuffer current = iter.next();
+                Column column = cf.getColumn(current);
+                if (column != null)
+                    return column;
+            }
+            return endOfData();
+        }
+
+        public void close() throws IOException { }
+    }
+
     public static class Serializer implements IVersionedSerializer<NamesQueryFilter>
     {
         public void serialize(NamesQueryFilter f, DataOutput out, int version) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/QueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/QueryFilter.java b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
index ac0c632..4f71f3a 100644
--- a/src/java/org/apache/cassandra/db/filter/QueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
@@ -48,13 +48,13 @@ public class QueryFilter
         ColumnFamily cf = memtable.getColumnFamily(key);
         if (cf == null)
             return null;
-        return getMemtableColumnIterator(cf, key);
+        return getColumnFamilyIterator(cf);
     }
 
-    public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
+    public OnDiskAtomIterator getColumnFamilyIterator(ColumnFamily cf)
     {
         assert cf != null;
-        return filter.getMemtableColumnIterator(cf, key);
+        return filter.getColumnFamilyIterator(key, cf);
     }
 
     public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable)
@@ -69,10 +69,15 @@ public class QueryFilter
 
     public void collateOnDiskAtom(final ColumnFamily returnCF, List<? extends Iterator<? extends OnDiskAtom>> toCollate, final int gcBefore)
     {
+        collateOnDiskAtom(returnCF, toCollate, filter, gcBefore, timestamp);
+    }
+
+    public static void collateOnDiskAtom(final ColumnFamily returnCF, List<? extends Iterator<? extends OnDiskAtom>> toCollate, IDiskAtomFilter filter, int gcBefore, long timestamp)
+    {
         List<Iterator<Column>> filteredIterators = new ArrayList<Iterator<Column>>(toCollate.size());
         for (Iterator<? extends OnDiskAtom> iter : toCollate)
             filteredIterators.add(gatherTombstones(returnCF, iter));
-        collateColumns(returnCF, filteredIterators, gcBefore);
+        collateColumns(returnCF, filteredIterators, filter, gcBefore, timestamp);
     }
 
     /**
@@ -84,7 +89,12 @@ public class QueryFilter
         filter.collectReducedColumns(returnCF, columns, gcBefore, timestamp);
     }
 
-    public void collateColumns(final ColumnFamily returnCF, List<? extends Iterator<Column>> toCollate, final int gcBefore)
+    public void collateColumns(final ColumnFamily returnCF, List<? extends Iterator<Column>> toCollate, int gcBefore)
+    {
+        collateColumns(returnCF, toCollate, filter, gcBefore, timestamp);
+    }
+
+    public static void collateColumns(final ColumnFamily returnCF, List<? extends Iterator<Column>> toCollate, IDiskAtomFilter filter, int gcBefore, long timestamp)
     {
         final Comparator<Column> fcomp = filter.getColumnComparator(returnCF.getComparator());
         // define a 'reduced' iterator that merges columns w/ the same name, which

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index b76ce04..9cbc49e 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -22,6 +22,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.Iterator;
@@ -95,14 +96,74 @@ public class SliceQueryFilter implements IDiskAtomFilter
         return new SliceQueryFilter(newSlices, reversed, count, compositesToGroup);
     }
 
+    public SliceQueryFilter withUpdatedStart(ByteBuffer newStart, AbstractType<?> comparator)
+    {
+        Comparator<ByteBuffer> cmp = reversed ? comparator.reverseComparator : comparator;
+
+        List<ColumnSlice> newSlices = new ArrayList<ColumnSlice>();
+        boolean pastNewStart = false;
+        for (int i = 0; i < slices.length; i++)
+        {
+            ColumnSlice slice = slices[i];
+
+            if (pastNewStart)
+            {
+                newSlices.add(slice);
+                continue;
+            }
+
+            if (slices[i].isBefore(cmp, newStart))
+                continue;
+
+            if (slice.includes(cmp, newStart))
+                newSlices.add(new ColumnSlice(newStart, slice.finish));
+            else
+                newSlices.add(slice);
+
+            pastNewStart = true;
+        }
+        return withUpdatedSlices(newSlices.toArray(new ColumnSlice[newSlices.size()]));
+    }
+
     public SliceQueryFilter withUpdatedSlice(ByteBuffer start, ByteBuffer finish)
     {
         return new SliceQueryFilter(new ColumnSlice[]{ new ColumnSlice(start, finish) }, reversed, count, compositesToGroup);
     }
 
-    public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
+    public OnDiskAtomIterator getColumnFamilyIterator(final DecoratedKey key, final ColumnFamily cf)
     {
-        return Memtable.getSliceIterator(key, cf, this);
+        assert cf != null;
+        final Iterator<Column> filteredIter = reversed ? cf.reverseIterator(slices) : cf.iterator(slices);
+
+        return new OnDiskAtomIterator()
+        {
+            public ColumnFamily getColumnFamily()
+            {
+                return cf;
+            }
+
+            public DecoratedKey getKey()
+            {
+                return key;
+            }
+
+            public boolean hasNext()
+            {
+                return filteredIter.hasNext();
+            }
+
+            public OnDiskAtom next()
+            {
+                return filteredIter.next();
+            }
+
+            public void close() throws IOException { }
+
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+        };
     }
 
     public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key)
@@ -122,7 +183,7 @@ public class SliceQueryFilter implements IDiskAtomFilter
 
     public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore, long now)
     {
-        columnCounter = getColumnCounter(container, now);
+        columnCounter = columnCounter(container.getComparator(), now);
 
         while (reducedColumns.hasNext())
         {
@@ -144,15 +205,11 @@ public class SliceQueryFilter implements IDiskAtomFilter
 
     public int getLiveCount(ColumnFamily cf, long now)
     {
-        ColumnCounter counter = getColumnCounter(cf, now);
-        for (Column column : cf)
-            counter.count(column, cf);
-        return counter.live();
+        return columnCounter(cf.getComparator(), now).countAll(cf).live();
     }
 
-    private ColumnCounter getColumnCounter(ColumnFamily container, long now)
+    public ColumnCounter columnCounter(AbstractType<?> comparator, long now)
     {
-        AbstractType<?> comparator = container.getComparator();
         if (compositesToGroup < 0)
             return new ColumnCounter(now);
         else if (compositesToGroup == 0)
@@ -163,7 +220,7 @@ public class SliceQueryFilter implements IDiskAtomFilter
 
     public void trim(ColumnFamily cf, int trimTo, long now)
     {
-        ColumnCounter counter = getColumnCounter(cf, now);
+        ColumnCounter counter = columnCounter(cf.getComparator(), now);
 
         Collection<Column> columns = reversed
                                    ? cf.getReverseSortedColumns()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 17ac81f..a40f4bd 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.filter.ExtendedFilter;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -514,14 +515,9 @@ public class SecondaryIndexManager
      * @param columnFilter the column range to restrict to
      * @return found indexed rows
      */
-    public List<Row> search(AbstractBounds<RowPosition> range,
-                            List<IndexExpression> clause,
-                            IDiskAtomFilter columnFilter,
-                            int maxResults,
-                            long now,
-                            boolean countCQL3Rows)
+    public List<Row> search(ExtendedFilter filter)
     {
-        List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersForQuery(clause);
+        List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersForQuery(filter.getClause());
 
         if (indexSearchers.isEmpty())
             return Collections.emptyList();
@@ -530,7 +526,7 @@ public class SecondaryIndexManager
         if (indexSearchers.size() > 1)
             throw new RuntimeException("Unable to search across multiple secondary index types");
 
-        return indexSearchers.get(0).search(range, clause, columnFilter, maxResults, now, countCQL3Rows);
+        return indexSearchers.get(0).search(filter);
     }
 
     public Collection<SecondaryIndex> getIndexesByNames(Set<String> idxNames)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index ddd79dd..d28afc0 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -21,8 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.db.filter.ExtendedFilter;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
 
@@ -39,12 +38,7 @@ public abstract class SecondaryIndexSearcher
         this.baseCfs = indexManager.baseCfs;
     }
 
-    public abstract List<Row> search(AbstractBounds<RowPosition> range,
-                                     List<IndexExpression> clause,
-                                     IDiskAtomFilter dataFilter,
-                                     int maxResults,
-                                     long now,
-                                     boolean countCQL3Rows);
+    public abstract List<Row> search(ExtendedFilter filter);
 
     /**
      * @return true this index is able to handle given clauses.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index e8c0a09..f9b7b11 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -44,16 +44,10 @@ public class CompositesSearcher extends SecondaryIndexSearcher
     }
 
     @Override
-    public List<Row> search(AbstractBounds<RowPosition> range,
-                            List<IndexExpression> clause,
-                            IDiskAtomFilter dataFilter,
-                            int maxResults,
-                            long now,
-                            boolean countCQL3Rows)
+    public List<Row> search(ExtendedFilter filter)
     {
-        assert clause != null && !clause.isEmpty();
-        ExtendedFilter filter = ExtendedFilter.create(baseCfs, clause, dataFilter, maxResults, now, countCQL3Rows, false);
-        return baseCfs.filter(getIndexedIterator(range, filter), filter);
+        assert filter.getClause() != null && !filter.getClause().isEmpty();
+        return baseCfs.filter(getIndexedIterator(filter), filter);
     }
 
     private ByteBuffer makePrefix(CompositesIndex index, ByteBuffer key, ExtendedFilter filter, boolean isStart)
@@ -62,10 +56,11 @@ public class CompositesSearcher extends SecondaryIndexSearcher
             return ByteBufferUtil.EMPTY_BYTE_BUFFER;
 
         ColumnNameBuilder builder;
-        if (filter.originalFilter() instanceof SliceQueryFilter)
+        IDiskAtomFilter columnFilter = filter.columnFilter(key);
+        if (columnFilter instanceof SliceQueryFilter)
         {
-            SliceQueryFilter originalFilter = (SliceQueryFilter)filter.originalFilter();
-            builder = index.makeIndexColumnNameBuilder(key, isStart ? originalFilter.start() : originalFilter.finish());
+            SliceQueryFilter sqf = (SliceQueryFilter)columnFilter;
+            builder = index.makeIndexColumnNameBuilder(key, isStart ? sqf.start() : sqf.finish());
         }
         else
         {
@@ -74,7 +69,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
         return isStart ? builder.build() : builder.buildAsEndOfRange();
     }
 
-    private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
+    private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final ExtendedFilter filter)
     {
         // Start with the most-restrictive indexed clause, then apply remaining clauses
         // to each row matching that clause.
@@ -93,6 +88,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
          * possible key having a given token. A fix would be to actually store the token along the key in the
          * indexed row.
          */
+        final AbstractBounds<RowPosition> range = filter.dataRange.keyRange();
         ByteBuffer startKey = range.left instanceof DecoratedKey ? ((DecoratedKey)range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
         ByteBuffer endKey = range.right instanceof DecoratedKey ? ((DecoratedKey)range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
 
@@ -140,12 +136,12 @@ public class CompositesSearcher extends SecondaryIndexSearcher
                 DecoratedKey currentKey = null;
                 ColumnFamily data = null;
                 int columnsCount = 0;
-                int limit = ((SliceQueryFilter)filter.initialFilter()).count;
+                int limit = filter.currentLimit();
 
                 while (true)
                 {
                     // Did we got more columns that needed to respect the user limit?
-                    // (but we still need to return was fetch already)
+                    // (but we still need to return what was fetch already)
                     if (columnsCount > limit)
                         return makeReturn(currentKey, data);
 
@@ -235,7 +231,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
 
                         // Check if this entry cannot be a hit due to the original column filter
                         ByteBuffer start = entry.indexedEntryStart();
-                        if (!filter.originalFilter().maySelectPrefix(baseComparator, start))
+                        if (!filter.columnFilter(dk.key).maySelectPrefix(baseComparator, start))
                             continue;
 
                         logger.trace("Adding index hit to current row for {}", indexComparator.getString(column.name()));
@@ -256,7 +252,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
 
                         assert newData != null : "An entry with not data should have been considered stale";
 
-                        if (!filter.isSatisfiedBy(dk.key, newData, entry.indexedEntryNameBuilder))
+                        if (!filter.isSatisfiedBy(dk, newData, entry.indexedEntryNameBuilder))
                             continue;
 
                         if (data == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index e919d8a..205efb7 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -47,20 +47,15 @@ public class KeysSearcher extends SecondaryIndexSearcher
     }
 
     @Override
-    public List<Row> search(AbstractBounds<RowPosition> range,
-                            List<IndexExpression> clause,
-                            IDiskAtomFilter dataFilter,
-                            int maxResults,
-                            long now,
-                            boolean countCQL3Rows)
+    public List<Row> search(ExtendedFilter filter)
     {
-        assert clause != null && !clause.isEmpty();
-        ExtendedFilter filter = ExtendedFilter.create(baseCfs, clause, dataFilter, maxResults, now, countCQL3Rows, false);
-        return baseCfs.filter(getIndexedIterator(range, filter), filter);
+        assert filter.getClause() != null && !filter.getClause().isEmpty();
+        return baseCfs.filter(getIndexedIterator(filter), filter);
     }
 
-    private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
+    private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final ExtendedFilter filter)
     {
+
         // Start with the most-restrictive indexed clause, then apply remaining clauses
         // to each row matching that clause.
         // TODO: allow merge join instead of just one index + loop
@@ -79,6 +74,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
          * possible key having a given token. A fix would be to actually store the token along the key in the
          * indexed row.
          */
+        final AbstractBounds<RowPosition> range = filter.dataRange.keyRange();
         final ByteBuffer startKey = range.left instanceof DecoratedKey ? ((DecoratedKey)range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
         final ByteBuffer endKey = range.right instanceof DecoratedKey ? ((DecoratedKey)range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
 
@@ -165,14 +161,14 @@ public class KeysSearcher extends SecondaryIndexSearcher
                         }
 
                         logger.trace("Returning index hit for {}", dk);
-                        ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.initialFilter(), filter.timestamp));
+                        ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.columnFilter(lastSeenKey), filter.timestamp));
                         // While the column family we'll get in the end should contains the primary clause column, the initialFilter may not have found it and can thus be null
                         if (data == null)
                             data = TreeMapBackedSortedColumns.factory.create(baseCfs.metadata);
 
                         // as in CFS.filter - extend the filter to ensure we include the columns
                         // from the index expressions, just in case they weren't included in the initialFilter
-                        IDiskAtomFilter extraFilter = filter.getExtraFilter(data);
+                        IDiskAtomFilter extraFilter = filter.getExtraFilter(dk, data);
                         if (extraFilter != null)
                         {
                             ColumnFamily cf = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, extraFilter, filter.timestamp));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 85f2677..6c10d95 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.DataRange;
 import org.apache.cassandra.db.DataTracker;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
@@ -994,17 +995,12 @@ public class SSTableReader extends SSTable
 
     /**
      *
-     * @param filter filter to use when reading the columns
+     * @param dataRange filter to use when reading the columns
      * @return A Scanner for seeking over the rows of the SSTable.
      */
-    public SSTableScanner getScanner(QueryFilter filter)
+    public SSTableScanner getScanner(DataRange dataRange)
     {
-        return new SSTableScanner(this, filter, null);
-    }
-
-    public SSTableScanner getScanner(QueryFilter filter, RowPosition startWith)
-    {
-        return new SSTableScanner(this, filter, startWith, null);
+        return new SSTableScanner(this, dataRange, null);
     }
 
     /**
@@ -1018,7 +1014,7 @@ public class SSTableReader extends SSTable
 
     public SSTableScanner getScanner(RateLimiter limiter)
     {
-        return new SSTableScanner(this, null, limiter);
+        return new SSTableScanner(this, DataRange.allData(partitioner), limiter);
     }
 
     /**
@@ -1034,7 +1030,7 @@ public class SSTableReader extends SSTable
 
         Iterator<Pair<Long, Long>> rangeIterator = getPositionsForRanges(Collections.singletonList(range)).iterator();
         if (rangeIterator.hasNext())
-            return new SSTableScanner(this, null, range, limiter);
+            return new SSTableScanner(this, DataRange.forKeyRange(range), limiter);
         else
             return new EmptyCompactionScanner(getFilename());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index fb52a02..66e7189 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -21,9 +21,9 @@ import java.io.IOException;
 import java.util.Iterator;
 
 import com.google.common.util.concurrent.RateLimiter;
-
 import com.google.common.collect.AbstractIterator;
 
+import org.apache.cassandra.db.DataRange;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.RowPosition;
@@ -32,6 +32,7 @@ import org.apache.cassandra.db.columniterator.LazyColumnIterator;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.compaction.ICompactionScanner;
 import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.util.FileUtils;
@@ -43,43 +44,46 @@ public class SSTableScanner implements ICompactionScanner
     protected final RandomAccessReader dfile;
     protected final RandomAccessReader ifile;
     public final SSTableReader sstable;
+    private final DataRange dataRange;
+    private final long stopAt;
+
     protected Iterator<OnDiskAtomIterator> iterator;
-    private final QueryFilter filter;
-    private long stopAt;
 
     /**
      * @param sstable SSTable to scan; must not be null
-     * @param filter filter to use when scanning the columns; may be null
+     * @param filter range of data to fetch; must not be null
      * @param limiter background i/o RateLimiter; may be null
      */
-    SSTableScanner(SSTableReader sstable, QueryFilter filter, RateLimiter limiter)
+    SSTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
     {
         assert sstable != null;
 
         this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
         this.ifile = sstable.openIndexReader();
         this.sstable = sstable;
-        this.filter = filter;
-        stopAt = dfile.length();
+        this.dataRange = dataRange;
+        this.stopAt = computeStopAt();
+        seekToStart();
     }
 
-    public SSTableScanner(SSTableReader sstable, QueryFilter filter, RowPosition startWith, RateLimiter limiter)
+    private void seekToStart()
     {
-        this(sstable, filter, limiter);
+        if (dataRange.startKey().isMinimum(sstable.partitioner))
+            return;
 
-        long indexPosition = sstable.getIndexScanPosition(startWith);
+        long indexPosition = sstable.getIndexScanPosition(dataRange.startKey());
         // -1 means the key is before everything in the sstable. So just start from the beginning.
         if (indexPosition == -1)
-            indexPosition = 0;
-        ifile.seek(indexPosition);
+            return;
 
+        ifile.seek(indexPosition);
         try
         {
             while (!ifile.isEOF())
             {
                 indexPosition = ifile.getFilePointer();
                 DecoratedKey indexDecoratedKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
-                int comparison = indexDecoratedKey.compareTo(startWith);
+                int comparison = indexDecoratedKey.compareTo(dataRange.startKey());
                 if (comparison >= 0)
                 {
                     // Found, just read the dataPosition and seek into index and data files
@@ -102,19 +106,14 @@ public class SSTableScanner implements ICompactionScanner
 
     }
 
-    public SSTableScanner(SSTableReader sstable, QueryFilter filter, Range<Token> range, RateLimiter limiter)
+    private long computeStopAt()
     {
-        this(sstable, filter, range.toRowBounds().left, limiter);
+        AbstractBounds<RowPosition> keyRange = dataRange.keyRange();
+        if (dataRange.stopKey().isMinimum(sstable.partitioner) || (keyRange instanceof Range && ((Range)keyRange).isWrapAround()))
+            return dfile.length();
 
-        if (range.isWrapAround())
-        {
-            stopAt = dfile.length();
-        }
-        else
-        {
-            RowIndexEntry position = sstable.getPosition(range.toRowBounds().right, SSTableReader.Operator.GT);
-            stopAt = position == null ? dfile.length() : position.position;
-        }
+        RowIndexEntry position = sstable.getPosition(keyRange.toRowBounds().right, SSTableReader.Operator.GT);
+        return position == null ? dfile.length() : position.position;
     }
 
     public void close() throws IOException
@@ -202,8 +201,7 @@ public class SSTableScanner implements ICompactionScanner
                 }
 
                 assert !dfile.isEOF();
-
-                if (filter == null)
+                if (dataRange.selectsFullRowFor(currentKey.key))
                 {
                     dfile.seek(currentEntry.position);
                     ByteBufferUtil.readWithShortLength(dfile); // key
@@ -217,7 +215,7 @@ public class SSTableScanner implements ICompactionScanner
                 {
                     public OnDiskAtomIterator create()
                     {
-                        return filter.getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry);
+                        return dataRange.columnFilter(currentKey.key).getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry);
                     }
                 });
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 43e2cb2..4ac408e 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -116,6 +116,7 @@ public final class MessagingService implements MessagingServiceMBean
         PAXOS_PREPARE,
         PAXOS_PROPOSE,
         PAXOS_COMMIT,
+        PAGED_RANGE,
         // remember to add new verbs at the end, since we serialize by ordinal
         UNUSED_1,
         UNUSED_2,
@@ -136,6 +137,7 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.READ, Stage.READ);
         put(Verb.RANGE_SLICE, Stage.READ);
         put(Verb.INDEX_SCAN, Stage.READ);
+        put(Verb.PAGED_RANGE, Stage.READ);
 
         put(Verb.REQUEST_RESPONSE, Stage.REQUEST_RESPONSE);
         put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
@@ -188,6 +190,7 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.READ_REPAIR, RowMutation.serializer);
         put(Verb.READ, ReadCommand.serializer);
         put(Verb.RANGE_SLICE, RangeSliceCommand.serializer);
+        put(Verb.PAGED_RANGE, RangeSliceCommand.serializer);
         put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
         put(Verb.TREE_REQUEST, ActiveRepairService.TreeRequest.serializer);
         put(Verb.TREE_RESPONSE, ActiveRepairService.Validator.serializer);
@@ -216,6 +219,7 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.READ_REPAIR, WriteResponse.serializer);
         put(Verb.COUNTER_MUTATION, WriteResponse.serializer);
         put(Verb.RANGE_SLICE, RangeSliceReply.serializer);
+        put(Verb.PAGED_RANGE, RangeSliceReply.serializer);
         put(Verb.READ, ReadResponse.serializer);
         put(Verb.TRUNCATE, TruncateResponse.serializer);
         put(Verb.SNAPSHOT, null);
@@ -293,6 +297,7 @@ public final class MessagingService implements MessagingServiceMBean
                                                                    Verb.READ_REPAIR,
                                                                    Verb.READ,
                                                                    Verb.RANGE_SLICE,
+                                                                   Verb.PAGED_RANGE,
                                                                    Verb.REQUEST_RESPONSE);
 
     // total dropped message counts for server lifetime

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/QueryState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/QueryState.java b/src/java/org/apache/cassandra/service/QueryState.java
index 6483d9b..38d103d 100644
--- a/src/java/org/apache/cassandra/service/QueryState.java
+++ b/src/java/org/apache/cassandra/service/QueryState.java
@@ -17,9 +17,17 @@
  */
 package org.apache.cassandra.service;
 
+import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.UUID;
 
+import org.apache.cassandra.cql3.statements.SelectStatement;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.pager.QueryPager;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -30,6 +38,7 @@ public class QueryState
     private final ClientState clientState;
     private volatile long clock;
     private volatile UUID preparedTracingSession;
+    private volatile Pager pager;
 
     public QueryState(ClientState clientState)
     {
@@ -68,6 +77,13 @@ public class QueryState
         this.preparedTracingSession = sessionId;
     }
 
+    public UUID getAndResetCurrentTracingSession()
+    {
+        UUID previous = preparedTracingSession;
+        preparedTracingSession = null;
+        return previous;
+    }
+
     public void createTracingSession()
     {
         if (this.preparedTracingSession == null)
@@ -77,9 +93,53 @@ public class QueryState
         else
         {
             UUID session = this.preparedTracingSession;
-            this.preparedTracingSession = null;
             Tracing.instance.newSession(session);
         }
     }
-}
 
+    public void attachPager(QueryPager queryPager, SelectStatement statement, List<ByteBuffer> variables)
+    {
+        pager = new Pager(queryPager, statement, variables);
+    }
+
+    public boolean hasPager()
+    {
+        return pager != null;
+    }
+
+    public void dropPager()
+    {
+        pager = null;
+    }
+
+    public ResultMessage.Rows getNextPage(int pageSize) throws RequestValidationException, RequestExecutionException
+    {
+        assert pager != null; // We've already validated (in ServerConnection) that this should not be null
+
+        int currentLimit = pager.queryPager.maxRemaining();
+        List<Row> page = pager.queryPager.fetchPage(pageSize);
+        ResultMessage.Rows msg = pager.statement.processResults(page, pager.variables, currentLimit, pager.queryPager.timestamp());
+
+        if (pager.queryPager.isExhausted())
+            dropPager();
+        else
+            msg.result.metadata.setHasMorePages();
+
+        return msg;
+    }
+
+    // Groups the actual query pager with the Select Query
+    private static class Pager
+    {
+        private final QueryPager queryPager;
+        private final SelectStatement statement;
+        private final List<ByteBuffer> variables;
+
+        private Pager(QueryPager queryPager, SelectStatement statement, List<ByteBuffer> variables)
+        {
+            this.queryPager = queryPager;
+            this.statement = statement;
+            this.variables = variables;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
index f63fcb1..9a7d1d2 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
@@ -19,39 +19,16 @@ package org.apache.cassandra.service;
 
 import java.util.List;
 
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.RangeSliceCommand;
+import org.apache.cassandra.db.AbstractRangeCommand;
 import org.apache.cassandra.db.RangeSliceReply;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.db.Table;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.tracing.Tracing;
 
-public class RangeSliceVerbHandler implements IVerbHandler<RangeSliceCommand>
+public class RangeSliceVerbHandler implements IVerbHandler<AbstractRangeCommand>
 {
-    public static List<Row> executeLocally(RangeSliceCommand command)
-    {
-        ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
-        if (cfs.indexManager.hasIndexFor(command.row_filter))
-            return cfs.search(command.range,
-                              command.row_filter,
-                              command.predicate,
-                              command.maxResults,
-                              command.timestamp,
-                              command.countCQL3Rows);
-        else
-            return cfs.getRangeSlice(command.range,
-                                     command.row_filter,
-                                     command.predicate,
-                                     command.maxResults,
-                                     command.timestamp,
-                                     command.countCQL3Rows,
-                                     command.isPaging);
-    }
-
-    public void doVerb(MessageIn<RangeSliceCommand> message, int id)
+    public void doVerb(MessageIn<AbstractRangeCommand> message, int id)
     {
         try
         {
@@ -60,7 +37,7 @@ public class RangeSliceVerbHandler implements IVerbHandler<RangeSliceCommand>
                 /* Don't service reads! */
                 throw new RuntimeException("Cannot service reads while bootstrapping!");
             }
-            RangeSliceReply reply = new RangeSliceReply(executeLocally(message.payload));
+            RangeSliceReply reply = new RangeSliceReply(message.payload.executeLocally());
             Tracing.trace("Enqueuing response to {}", message.from);
             MessagingService.instance().sendReply(reply.createMessage(), id, message.from);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 612f89b..0286bd3 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1294,11 +1294,11 @@ public class StorageProxy implements StorageProxyMBean
 
     static class LocalRangeSliceRunnable extends DroppableRunnable
     {
-        private final RangeSliceCommand command;
+        private final AbstractRangeCommand command;
         private final ReadCallback<RangeSliceReply, Iterable<Row>> handler;
         private final long start = System.nanoTime();
 
-        LocalRangeSliceRunnable(RangeSliceCommand command, ReadCallback<RangeSliceReply, Iterable<Row>> handler)
+        LocalRangeSliceRunnable(AbstractRangeCommand command, ReadCallback<RangeSliceReply, Iterable<Row>> handler)
         {
             super(MessagingService.Verb.READ);
             this.command = command;
@@ -1307,7 +1307,7 @@ public class StorageProxy implements StorageProxyMBean
 
         protected void runMayThrow()
         {
-            RangeSliceReply result = new RangeSliceReply(RangeSliceVerbHandler.executeLocally(command));
+            RangeSliceReply result = new RangeSliceReply(command.executeLocally());
             MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
             handler.response(result);
         }
@@ -1337,7 +1337,7 @@ public class StorageProxy implements StorageProxyMBean
         return inter;
     }
 
-    public static List<Row> getRangeSlice(RangeSliceCommand command, ConsistencyLevel consistency_level)
+    public static List<Row> getRangeSlice(AbstractRangeCommand command, ConsistencyLevel consistency_level)
     throws UnavailableException, ReadTimeoutException
     {
         Tracing.trace("Determining replicas to query");
@@ -1348,11 +1348,9 @@ public class StorageProxy implements StorageProxyMBean
         // now scan until we have enough results
         try
         {
-            IDiskAtomFilter commandPredicate = command.predicate;
-
             int cql3RowCount = 0;
             rows = new ArrayList<Row>();
-            List<AbstractBounds<RowPosition>> ranges = getRestrictedRanges(command.range);
+            List<AbstractBounds<RowPosition>> ranges = getRestrictedRanges(command.keyRange);
             int i = 0;
             AbstractBounds<RowPosition> nextRange = null;
             List<InetAddress> nextEndpoints = null;
@@ -1408,15 +1406,7 @@ public class StorageProxy implements StorageProxyMBean
                     ++i;
                 }
 
-                RangeSliceCommand nodeCmd = new RangeSliceCommand(command.keyspace,
-                                                                  command.column_family,
-                                                                  command.timestamp,
-                                                                  commandPredicate,
-                                                                  range,
-                                                                  command.row_filter,
-                                                                  command.maxResults,
-                                                                  command.countCQL3Rows,
-                                                                  command.isPaging);
+                AbstractRangeCommand nodeCmd = command.forSubRange(range);
 
                 // collect replies and resolve according to consistency level
                 RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp);
@@ -1431,7 +1421,7 @@ public class StorageProxy implements StorageProxyMBean
                 }
                 else
                 {
-                    MessageOut<RangeSliceCommand> message = nodeCmd.createMessage();
+                    MessageOut<? extends AbstractRangeCommand> message = nodeCmd.createMessage();
                     for (InetAddress endpoint : filteredEndpoints)
                     {
                         Tracing.trace("Enqueuing request to {}", endpoint);
@@ -1444,8 +1434,8 @@ public class StorageProxy implements StorageProxyMBean
                     for (Row row : handler.get())
                     {
                         rows.add(row);
-                        if (nodeCmd.countCQL3Rows)
-                            cql3RowCount += row.getLiveCount(commandPredicate, command.timestamp);
+                        if (nodeCmd.countCQL3Rows())
+                            cql3RowCount += row.getLiveCount(command.predicate, command.timestamp);
                     }
                     FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
                 }
@@ -1462,18 +1452,9 @@ public class StorageProxy implements StorageProxyMBean
                 }
 
                 // if we're done, great, otherwise, move to the next range
-                int count = nodeCmd.countCQL3Rows ? cql3RowCount : rows.size();
-                if (count >= nodeCmd.maxResults)
+                int count = nodeCmd.countCQL3Rows() ? cql3RowCount : rows.size();
+                if (count >= nodeCmd.limit())
                     break;
-
-                // if we are paging and already got some rows, reset the column filter predicate,
-                // so we start iterating the next row from the first column
-                if (!rows.isEmpty() && command.isPaging)
-                {
-                    // We only allow paging with a slice filter (doesn't make sense otherwise anyway)
-                    assert commandPredicate instanceof SliceQueryFilter;
-                    commandPredicate = ((SliceQueryFilter)commandPredicate).withUpdatedSlices(ColumnSlice.ALL_COLUMNS_ARRAY);
-                }
             }
         }
         finally
@@ -1483,13 +1464,13 @@ public class StorageProxy implements StorageProxyMBean
         return trim(command, rows);
     }
 
-    private static List<Row> trim(RangeSliceCommand command, List<Row> rows)
+    private static List<Row> trim(AbstractRangeCommand command, List<Row> rows)
     {
-        // When countCQL3Rows, we let the caller trim the result.
-        if (command.countCQL3Rows)
+        // When maxIsColumns, we let the caller trim the result.
+        if (command.countCQL3Rows())
             return rows;
         else
-            return rows.size() > command.maxResults ? rows.subList(0, command.maxResults) : rows;
+            return rows.size() > command.limit() ? rows.subList(0, command.limit()) : rows;
     }
 
     /**


[4/4] git commit: Add auto paging capability to the native protocol

Posted by sl...@apache.org.
Add auto paging capability to the native protocol

This also generalize the paging used internally, and pages CQL3 'select
count' operation to avoid OOM.

patch by slebresne; reviewed by iamaleksey for CASSANDRA-4415


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e48ff293
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e48ff293
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e48ff293

Branch: refs/heads/trunk
Commit: e48ff29387547c0837ab381f6e890f8417a1b65c
Parents: 40bc445
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue May 28 10:17:26 2013 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Jun 25 10:30:12 2013 +0200

----------------------------------------------------------------------
 doc/native_protocol_v2.spec                     |  92 +++++-
 src/java/org/apache/cassandra/auth/Auth.java    |   3 +-
 .../cassandra/auth/CassandraAuthorizer.java     |   3 +-
 .../cassandra/auth/PasswordAuthenticator.java   |   3 +-
 .../org/apache/cassandra/cql3/CQLStatement.java |   5 +-
 .../apache/cassandra/cql3/QueryProcessor.java   |  14 +-
 .../org/apache/cassandra/cql3/ResultSet.java    |  25 +-
 .../statements/AuthenticationStatement.java     |   2 +-
 .../cql3/statements/AuthorizationStatement.java |   2 +-
 .../cql3/statements/BatchStatement.java         |   2 +-
 .../cql3/statements/ModificationStatement.java  |   2 +-
 .../statements/SchemaAlteringStatement.java     |   2 +-
 .../cql3/statements/SelectStatement.java        |  72 ++++-
 .../cql3/statements/TruncateStatement.java      |   2 +-
 .../cassandra/cql3/statements/UseStatement.java |   2 +-
 .../cassandra/db/AbstractRangeCommand.java      |  67 +++++
 .../apache/cassandra/db/ColumnFamilyStore.java  | 123 ++++++--
 src/java/org/apache/cassandra/db/DataRange.java | 218 ++++++++++++++
 .../cassandra/db/HintedHandOffManager.java      |   2 +-
 src/java/org/apache/cassandra/db/Memtable.java  |  85 ------
 .../apache/cassandra/db/PagedRangeCommand.java  | 196 +++++++++++++
 .../apache/cassandra/db/RangeSliceCommand.java  | 115 +++++---
 .../org/apache/cassandra/db/ReadCommand.java    |   3 +-
 .../apache/cassandra/db/RowIteratorFactory.java |  30 +-
 .../cassandra/db/SliceFromReadCommand.java      |   5 +
 .../apache/cassandra/db/SliceQueryPager.java    |  88 ------
 .../org/apache/cassandra/db/SystemTable.java    |   3 +-
 src/java/org/apache/cassandra/db/Table.java     |   4 +-
 .../cassandra/db/filter/ColumnCounter.java      |  10 +
 .../apache/cassandra/db/filter/ColumnSlice.java |   6 +
 .../cassandra/db/filter/ExtendedFilter.java     | 237 +++++++---------
 .../cassandra/db/filter/IDiskAtomFilter.java    |   5 +-
 .../cassandra/db/filter/NamesQueryFilter.java   |  58 +++-
 .../apache/cassandra/db/filter/QueryFilter.java |  20 +-
 .../cassandra/db/filter/SliceQueryFilter.java   |  77 ++++-
 .../db/index/SecondaryIndexManager.java         |  12 +-
 .../db/index/SecondaryIndexSearcher.java        |  10 +-
 .../db/index/composites/CompositesSearcher.java |  30 +-
 .../cassandra/db/index/keys/KeysSearcher.java   |  20 +-
 .../cassandra/io/sstable/SSTableReader.java     |  16 +-
 .../cassandra/io/sstable/SSTableScanner.java    |  52 ++--
 .../apache/cassandra/net/MessagingService.java  |   5 +
 .../apache/cassandra/service/QueryState.java    |  64 ++++-
 .../service/RangeSliceVerbHandler.java          |  31 +-
 .../apache/cassandra/service/StorageProxy.java  |  49 +---
 .../service/pager/AbstractQueryPager.java       | 245 ++++++++++++++++
 .../service/pager/MultiPartitionPager.java      | 114 ++++++++
 .../service/pager/NamesQueryPager.java          |  94 ++++++
 .../cassandra/service/pager/Pageable.java       |  38 +++
 .../cassandra/service/pager/QueryPager.java     |  75 +++++
 .../cassandra/service/pager/QueryPagers.java    | 184 ++++++++++++
 .../service/pager/RangeNamesQueryPager.java     |  90 ++++++
 .../service/pager/RangeSliceQueryPager.java     |  99 +++++++
 .../service/pager/SinglePartitionPager.java     |  30 ++
 .../service/pager/SliceQueryPager.java          |  72 +++++
 .../cassandra/thrift/CassandraServer.java       | 142 ++++------
 .../cassandra/thrift/ThriftConversion.java      |   4 +-
 .../org/apache/cassandra/transport/CBCodec.java |   2 +-
 .../org/apache/cassandra/transport/Client.java  |  44 ++-
 .../org/apache/cassandra/transport/Message.java |   7 +-
 .../cassandra/transport/ServerConnection.java   |  18 +-
 .../cassandra/transport/SimpleClient.java       |   4 +-
 .../transport/messages/AuthChallenge.java       |   4 +-
 .../transport/messages/AuthResponse.java        |   4 +-
 .../transport/messages/AuthSuccess.java         |   4 +-
 .../transport/messages/AuthenticateMessage.java |   4 +-
 .../transport/messages/BatchMessage.java        |   4 +-
 .../transport/messages/CredentialsMessage.java  |   4 +-
 .../transport/messages/ErrorMessage.java        |   4 +-
 .../transport/messages/EventMessage.java        |   4 +-
 .../transport/messages/ExecuteMessage.java      |  34 ++-
 .../transport/messages/NextMessage.java         | 118 ++++++++
 .../transport/messages/OptionsMessage.java      |   4 +-
 .../transport/messages/PrepareMessage.java      |   4 +-
 .../transport/messages/QueryMessage.java        |  36 ++-
 .../transport/messages/ReadyMessage.java        |   4 +-
 .../transport/messages/RegisterMessage.java     |   4 +-
 .../transport/messages/ResultMessage.java       |  28 +-
 .../transport/messages/StartupMessage.java      |   4 +-
 .../transport/messages/SupportedMessage.java    |   4 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     | 113 +++++---
 .../db/compaction/CompactionsTest.java          |   4 +-
 .../cassandra/db/compaction/TTLExpiryTest.java  |   4 +-
 .../cassandra/service/QueryPagerTest.java       | 283 +++++++++++++++++++
 84 files changed, 2998 insertions(+), 812 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/doc/native_protocol_v2.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v2.spec b/doc/native_protocol_v2.spec
index 3959a15..2cc771d 100644
--- a/doc/native_protocol_v2.spec
+++ b/doc/native_protocol_v2.spec
@@ -22,6 +22,7 @@ Table of Contents
       4.1.6. EXECUTE
       4.1.7. BATCH
       4.1.8. REGISTER
+      4.1.9. NEXT
     4.2. Responses
       4.2.1. ERROR
       4.2.2. READY
@@ -38,8 +39,9 @@ Table of Contents
       4.2.8. AUTH_SUCCESS
   5. Compression
   6. Collection types
-  7. Error codes
-  8. Changes from v1
+  7. Result paging
+  8. Error codes
+  9. Changes from v1
 
 
 1. Overview
@@ -98,7 +100,7 @@ Table of Contents
     0x82    Response frame for this protocol version
 
   This document describe the version 2 of the protocol. For the changes made since
-  version 1, see Section 8.
+  version 1, see Section 9.
 
 
 2.2. flags
@@ -165,6 +167,7 @@ Table of Contents
     0x0E    AUTH_CHALLENGE
     0x0F    AUTH_RESPONSE
     0x10    AUTH_SUCCESS
+    0x11    NEXT
 
   Messages are described in Section 4.
 
@@ -276,10 +279,13 @@ Table of Contents
 4.1.4. QUERY
 
   Performs a CQL query. The body of the message must be:
-    <query><consistency>[<n><value_1>...<value_n>]
+    <query><consistency><result_page_size>[<n><value_1>...<value_n>]
   where:
     - <query> the query, [long string].
     - <consistency> is the [consistency] level for the operation.
+    - <result_page_size> is an [int] controlling the desired page size of the
+      result (in CQL3 rows). A negative value disable paging of the result. See the
+      section on paging (Section 7) for more details.
     - optional: <n> [short], the number of following values.
     - optional: <value_1>...<value_n> are [bytes] to use for bound variables in the query.
 
@@ -302,7 +308,7 @@ Table of Contents
 4.1.6. EXECUTE
 
   Executes a prepared query. The body of the message must be:
-    <id><n><value_1>....<value_n><consistency>
+    <id><n><value_1>....<value_n><consistency><result_page_size>
   where:
     - <id> is the prepared query ID. It's the [short bytes] returned as a
       response to a PREPARE message.
@@ -310,6 +316,9 @@ Table of Contents
     - <value_1>...<value_n> are the [bytes] to use for bound variables in the
       prepared query.
     - <consistency> is the [consistency] level for the operation.
+    - <result_page_size> is an [int] controlling the desired page size of the
+      result (in CQL3 rows). A negative value disable paging of the result. See the
+      section on paging (Section 7) for more details.
 
   Note that the consistency is ignored by some (prepared) queries (USE, CREATE,
   ALTER, TRUNCATE, ...).
@@ -360,6 +369,17 @@ Table of Contents
   multiple times the same event messages, wasting bandwidth.
 
 
+4.1.9. NEXT
+
+  Request the next page of result if paging was requested by a QUERY or EXECUTE
+  statement and there is more result to fetch (see Section 7 for more details).
+  The body of a NEXT message is a single [int] indicating the number of maximum
+  rows to return with the next page of results (it is equivalent to the
+  <result_page_size> in a QUERY or EXECUTE message).
+
+  The result to a NEXT message will be a RESULT message.
+
+
 4.2. Responses
 
   This section describes the content of the frame body for the different
@@ -372,7 +392,7 @@ Table of Contents
   Indicates an error processing a request. The body of the message will be an
   error code ([int]) followed by a [string] error message. Then, depending on
   the exception, more content may follow. The error codes are defined in
-  Section 7, along with their additional content if any.
+  Section 8, along with their additional content if any.
 
 
 4.2.2. READY
@@ -450,6 +470,12 @@ Table of Contents
             0x0001    Global_tables_spec: if set, only one table spec (keyspace
                       and table name) is provided as <global_table_spec>. If not
                       set, <global_table_spec> is not present.
+            0x0002    Has_more_pages: indicates whether this is not the last
+                      page of results and more should be retrieve using a NEXT
+                      message. If not set, this is the laste "page" of result
+                      and NEXT cannot and should not be used. If no result
+                      paging has been requested in the QUERY/EXECUTE/BATCH
+                      message, this will never be set.
         - <columns_count> is an [int] representing the number of columns selected
           by the query this result is of. It defines the number of <col_spec_i>
           elements in and the number of element for each row in <rows_content>.
@@ -623,7 +649,52 @@ Table of Contents
           value.
 
 
-7. Error codes
+7. Result paging
+
+  The protocol allows for paging the result of queries. For that, the QUERY and
+  EXECUTE messages have a <result_page_size> value that indicate the desired
+  page size in CQL3 rows.
+
+  If a positive value is provided for <result_page_size>, the result set of the
+  RESULT message returned for the query will contain at most the
+  <result_page_size> first rows of the query result. If that first page of result
+  contains the full result set for the query, the RESULT message (of kind `Rows`)
+  will have the Has_more_pages flag *not* set. However, if some results are not
+  part of the first response, the Has_more_pages flag will be set. In that latter
+  case, more rows of the result can be retrieved by sending a NEXT message *with the
+  same stream id than the initial query*. The NEXT message also contains its own
+  <result_page_size> that control how many of the remaining result rows will be
+  sent in response. If the response to this NEXT message still does not contains
+  the full remainder of the query result set (the Has_more_pages is set once more),
+  another NEXT message can be send for more result, etc...
+
+  If a RESULT message has the Has_more_pages flag set and any other message than
+  a NEXT message is send on the same stream id, the query is cancelled and no more
+  of its result can be retrieved.
+
+  Only CQL3 queries that return a result set (RESULT message with a Rows `kind`)
+  support paging. For other type of queries, the <result_page_size> value is
+  ignored.
+
+  The <result_page_size> can be set to a negative value to disable paging (in
+  which case the whole result set will be retuned in the first RESULT message,
+  message that will not have the Has_more_pages flag set). The
+  <result_page_size> value cannot be 0.
+
+  Note to client implementors:
+  - While <result_page_size> can be as low as 1, it will likely be detrimental
+    to performance to pick a value too low. A value below 100 is probably too
+    low for most use cases.
+  - Clients should not rely on the actual size of the result set returned to
+    decide if a NEXT message should be issued. Instead, they should always
+    check the Has_more_pages flag (unless they did not enabled paging for the query
+    obviously). Clients should also not assert that no result will have more than
+    <result_page_size> results. While the current implementation always respect
+    the exact value of <result_page_size>, we reserve ourselves the right to return
+    slightly smaller or bigger pages in the future for performance reasons.
+
+
+8. Error codes
 
   The supported error codes are described below:
     0x0000    Server error: something unexpected happened. This indicates a
@@ -715,7 +786,7 @@ Table of Contents
               this host. The rest of the ERROR message body will be [short
               bytes] representing the unknown ID.
 
-8. Changes from v1
+9. Changes from v1
   * Protocol is versioned to allow old client connects to a newer server, if a
     newer client connects to an older server, it needs to check if it gets a
     ProtocolException on connection and try connecting with a lower version.
@@ -727,3 +798,8 @@ Table of Contents
     removed and replaced by a server/client challenges/responses exchanges (done
     through the new AUTH_RESPONSE/AUTH_CHALLENGE messages). See Section 4.2.3 for
     details.
+  * Query paging has been added (Section 7): QUERY and EXECUTE message have an
+    additional <result_page_size> [int], a new NEXT message has been added and
+    the Rows kind of RESULT message has an additional flag. Note that paging is
+    optional, and a client that don't want to handle it can always pass -1 for
+    the <result_page_size> in QUERY and EXECUTE.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/auth/Auth.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/Auth.java b/src/java/org/apache/cassandra/auth/Auth.java
index c561aab..559e10c 100644
--- a/src/java/org/apache/cassandra/auth/Auth.java
+++ b/src/java/org/apache/cassandra/auth/Auth.java
@@ -232,7 +232,8 @@ public class Auth
         {
             ResultMessage.Rows rows = selectUserStatement.execute(consistencyForUser(username),
                                                                   new QueryState(new ClientState(true)),
-                                                                  Lists.newArrayList(ByteBufferUtil.bytes(username)));
+                                                                  Lists.newArrayList(ByteBufferUtil.bytes(username)),
+                                                                  -1);
             return new UntypedResultSet(rows.result);
         }
         catch (RequestValidationException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
index 396be71..6f490f8 100644
--- a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
+++ b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
@@ -74,7 +74,8 @@ public class CassandraAuthorizer implements IAuthorizer
             ResultMessage.Rows rows = authorizeStatement.execute(ConsistencyLevel.ONE,
                                                                  new QueryState(new ClientState(true)),
                                                                  Lists.newArrayList(ByteBufferUtil.bytes(user.getName()),
-                                                                                    ByteBufferUtil.bytes(resource.getName())));
+                                                                                    ByteBufferUtil.bytes(resource.getName())),
+                                                                 -1);
             result = new UntypedResultSet(rows.result);
         }
         catch (RequestValidationException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
index 12dbdee..4d37b7e 100644
--- a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
+++ b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
@@ -108,7 +108,8 @@ public class PasswordAuthenticator implements ISaslAwareAuthenticator
         {
             ResultMessage.Rows rows = authenticateStatement.execute(consistencyForUser(username),
                                                                     new QueryState(new ClientState(true)),
-                                                                    Lists.newArrayList(ByteBufferUtil.bytes(username)));
+                                                                    Lists.newArrayList(ByteBufferUtil.bytes(username)),
+                                                                    -1);
             result = new UntypedResultSet(rows.result);
         }
         catch (RequestValidationException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/CQLStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQLStatement.java b/src/java/org/apache/cassandra/cql3/CQLStatement.java
index 63f9cc6..a4abaf1 100644
--- a/src/java/org/apache/cassandra/cql3/CQLStatement.java
+++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java
@@ -51,11 +51,14 @@ public interface CQLStatement
     /**
      * Execute the statement and return the resulting result or null if there is no result.
      *
+     * @param cl the consistency level for the query
      * @param state the current query state
      * @param variables the values for bounded variables. The implementation
      * can assume that each bound term have a corresponding value.
+     * @param pageSize the initial page size for the result set potentially returned. A negative value
+     * means no paging needs to be done. Statements that do not return result sets can ignore this value.
      */
-    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables) throws RequestValidationException, RequestExecutionException;
+    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize) throws RequestValidationException, RequestExecutionException;
 
     /**
      * Variante of execute used for internal query against the system tables, and thus only query the local node.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 1b89fe3..1de985b 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -106,30 +106,30 @@ public class QueryProcessor
         }
     }
 
-    private static ResultMessage processStatement(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
+    private static ResultMessage processStatement(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables, int pageSize)
     throws RequestExecutionException, RequestValidationException
     {
         logger.trace("Process {} @CL.{}", statement, cl);
         ClientState clientState = queryState.getClientState();
         statement.checkAccess(clientState);
         statement.validate(clientState);
-        ResultMessage result = statement.execute(cl, queryState, variables);
+        ResultMessage result = statement.execute(cl, queryState, variables, pageSize);
         return result == null ? new ResultMessage.Void() : result;
     }
 
     public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState)
     throws RequestExecutionException, RequestValidationException
     {
-        return process(queryString, Collections.<ByteBuffer>emptyList(), cl, queryState);
+        return process(queryString, Collections.<ByteBuffer>emptyList(), cl, queryState, -1);
     }
 
-    public static ResultMessage process(String queryString, List<ByteBuffer> variables, ConsistencyLevel cl, QueryState queryState)
+    public static ResultMessage process(String queryString, List<ByteBuffer> variables, ConsistencyLevel cl, QueryState queryState, int pageSize)
     throws RequestExecutionException, RequestValidationException
     {
         CQLStatement prepared = getStatement(queryString, queryState.getClientState()).statement;
         if (prepared.getBoundsTerms() != variables.size())
             throw new InvalidRequestException("Invalid amount of bind variables");
-        return processStatement(prepared, cl, queryState, variables);
+        return processStatement(prepared, cl, queryState, variables, pageSize);
     }
 
     public static CQLStatement parseStatement(String queryStr, QueryState queryState) throws RequestValidationException
@@ -228,7 +228,7 @@ public class QueryProcessor
         }
     }
 
-    public static ResultMessage processPrepared(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
+    public static ResultMessage processPrepared(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables, int pageSize)
     throws RequestExecutionException, RequestValidationException
     {
         // Check to see if there are any bound variables to verify
@@ -246,7 +246,7 @@ public class QueryProcessor
                     logger.trace("[{}] '{}'", i+1, variables.get(i));
         }
 
-        return processStatement(statement, cl, queryState, variables);
+        return processStatement(statement, cl, queryState, variables, pageSize);
     }
 
     public static ResultMessage processBatch(BatchStatement batch, ConsistencyLevel cl, QueryState queryState, List<List<ByteBuffer>> variables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java
index 451efb2..df892b7 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -97,7 +97,11 @@ public class ResultSet
         String ksName = metadata.names.get(0).ksName;
         String cfName = metadata.names.get(0).cfName;
         long count = rows.size();
+        return makeCountResult(ksName, cfName, count, alias);
+    }
 
+    public static ResultSet makeCountResult(String ksName, String cfName, long count, ColumnIdentifier alias)
+    {
         ColumnSpecification spec = new ColumnSpecification(ksName, cfName, alias == null ? COUNT_COLUMN : alias, LongType.instance);
         Metadata newMetadata = new Metadata(Collections.singletonList(spec));
         List<List<ByteBuffer>> newRows = Collections.singletonList(Collections.singletonList(ByteBufferUtil.bytes(count)));
@@ -190,10 +194,10 @@ public class ResultSet
             return rs;
         }
 
-        public ChannelBuffer encode(ResultSet rs)
+        public ChannelBuffer encode(ResultSet rs, int version)
         {
             CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(2, 0, rs.metadata.names.size() * rs.rows.size());
-            builder.add(Metadata.codec.encode(rs.metadata));
+            builder.add(Metadata.codec.encode(rs.metadata, version));
             builder.add(CBUtil.intToCB(rs.rows.size()));
 
             for (List<ByteBuffer> row : rs.rows)
@@ -241,6 +245,11 @@ public class ResultSet
             return true;
         }
 
+        public void setHasMorePages()
+        {
+            flags.add(Flag.HAS_MORE_PAGES);
+        }
+
         @Override
         public String toString()
         {
@@ -251,6 +260,8 @@ public class ResultSet
                 sb.append("(").append(name.ksName).append(", ").append(name.cfName).append(")");
                 sb.append(", ").append(name.type).append("]");
             }
+            if (flags.contains(Flag.HAS_MORE_PAGES))
+                sb.append(" (to be continued)");
             return sb.toString();
         }
 
@@ -286,7 +297,7 @@ public class ResultSet
                 return new Metadata(flags, names);
             }
 
-            public ChannelBuffer encode(Metadata m)
+            public ChannelBuffer encode(Metadata m, int version)
             {
                 boolean globalTablesSpec = m.flags.contains(Flag.GLOBAL_TABLES_SPEC);
 
@@ -294,6 +305,9 @@ public class ResultSet
                 CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(1 + m.names.size(), stringCount, 0);
 
                 ChannelBuffer header = ChannelBuffers.buffer(8);
+
+                assert version > 1 || !m.flags.contains(Flag.HAS_MORE_PAGES);
+
                 header.writeInt(Flag.serialize(m.flags));
                 header.writeInt(m.names.size());
                 builder.add(header);
@@ -322,13 +336,14 @@ public class ResultSet
     public static enum Flag
     {
         // The order of that enum matters!!
-        GLOBAL_TABLES_SPEC;
+        GLOBAL_TABLES_SPEC,
+        HAS_MORE_PAGES;
 
         public static EnumSet<Flag> deserialize(int flags)
         {
             EnumSet<Flag> set = EnumSet.noneOf(Flag.class);
             Flag[] values = Flag.values();
-            for (int n = 0; n < 32; n++)
+            for (int n = 0; n < values.length; n++)
             {
                 if ((flags & (1 << n)) != 0)
                     set.add(values[n]);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java b/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
index 64468af..97d7be5 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
@@ -40,7 +40,7 @@ public abstract class AuthenticationStatement extends ParsedStatement implements
         return 0;
     }
 
-    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables)
+    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize)
     throws RequestExecutionException, RequestValidationException
     {
         return execute(state.getClientState());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
index af1bd17..5e317aa 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
@@ -41,7 +41,7 @@ public abstract class AuthorizationStatement extends ParsedStatement implements
         return 0;
     }
 
-    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables)
+    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize)
     throws RequestValidationException, RequestExecutionException
     {
         return execute(state.getClientState());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 1436811..6fbab72 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -133,7 +133,7 @@ public class BatchStatement implements CQLStatement
         }
     }
 
-    public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException
+    public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables, int pageSize) throws RequestExecutionException, RequestValidationException
     {
         if (cl == null)
             throw new InvalidRequestException("Invalid empty consistency level");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 62bd976..85728bc 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -332,7 +332,7 @@ public abstract class ModificationStatement implements CQLStatement
         return ifNotExists || (columnConditions != null && !columnConditions.isEmpty());
     }
 
-    public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
+    public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables, int pageSize)
     throws RequestExecutionException, RequestValidationException
     {
         if (cl == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
index 4d40e99..1c5f051 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
@@ -68,7 +68,7 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL
     public void validate(ClientState state) throws RequestValidationException
     {}
 
-    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables) throws RequestValidationException
+    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize) throws RequestValidationException
     {
         announceMigration();
         String tableName = cfName == null || columnFamily() == null ? "" : columnFamily();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 2be85c9..fac9c27 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -39,6 +39,9 @@ import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.RangeSliceVerbHandler;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.pager.Pageable;
+import org.apache.cassandra.service.pager.QueryPager;
+import org.apache.cassandra.service.pager.QueryPagers;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
@@ -54,6 +57,8 @@ import org.apache.cassandra.utils.Pair;
  */
 public class SelectStatement implements CQLStatement
 {
+    private static final int DEFAULT_COUNT_PAGE_SIZE = 10000;
+
     private final int boundTerms;
     public final CFDefinition cfDef;
     public final Parameters parameters;
@@ -130,23 +135,78 @@ public class SelectStatement implements CQLStatement
         // Nothing to do, all validation has been done by RawStatement.prepare()
     }
 
-    public ResultMessage.Rows execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException
+    public ResultMessage.Rows execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize) throws RequestExecutionException, RequestValidationException
     {
         if (cl == null)
             throw new InvalidRequestException("Invalid empty consistency level");
 
         cl.validateForRead(keyspace());
 
+
         int limit = getLimit(variables);
         long now = System.currentTimeMillis();
-        List<Row> rows = isKeyRange || usesSecondaryIndexing
-                       ? StorageProxy.getRangeSlice(getRangeCommand(variables, limit, now), cl)
-                       : StorageProxy.read(getSliceCommands(variables, limit, now), cl);
+        Pageable command = isKeyRange || usesSecondaryIndexing
+                         ? getRangeCommand(variables, limit, now)
+                         : new Pageable.ReadCommands(getSliceCommands(variables, limit, now));
+
+        // A count query will never be paged for the user, but we always page it internally to avoid OOM.
+        // If we user provided a pageSize we'll use that to page internally (because why not), otherwise we use our default
+        if (parameters.isCount && pageSize < 0)
+            pageSize = DEFAULT_COUNT_PAGE_SIZE;
+
+        if (pageSize < 0 || !QueryPagers.mayNeedPaging(command, pageSize))
+        {
+            return execute(command, cl, variables, limit, now);
+        }
+        else
+        {
+            QueryPager pager = QueryPagers.pager(command, cl);
+            return parameters.isCount
+                 ? pageCountQuery(pager, variables, pageSize)
+                 : setupPaging(pager, state, variables, limit, pageSize);
+        }
+    }
+
+    private ResultMessage.Rows execute(Pageable command, ConsistencyLevel cl, List<ByteBuffer> variables, int limit, long now) throws RequestValidationException, RequestExecutionException
+    {
+        List<Row> rows = command instanceof Pageable.ReadCommands
+                       ? StorageProxy.read(((Pageable.ReadCommands)command).commands, cl)
+                       : StorageProxy.getRangeSlice((RangeSliceCommand)command, cl);
 
         return processResults(rows, variables, limit, now);
     }
 
-    private ResultMessage.Rows processResults(List<Row> rows, List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
+    // TODO: we could probably refactor processResults so it doesn't needs the variables, so we don't have to keep around. But that can wait.
+    private ResultMessage.Rows setupPaging(QueryPager pager, QueryState state, List<ByteBuffer> variables, int limit, int pageSize) throws RequestValidationException, RequestExecutionException
+    {
+        List<Row> page = pager.fetchPage(pageSize);
+
+        ResultMessage.Rows msg = processResults(page, variables, limit, pager.timestamp());
+
+        // Don't bother setting up the pager if we actually don't need to.
+        if (pager.isExhausted())
+            return msg;
+
+        state.attachPager(pager, this, variables);
+        msg.result.metadata.setHasMorePages();
+        return msg;
+    }
+
+    private ResultMessage.Rows pageCountQuery(QueryPager pager, List<ByteBuffer> variables, int pageSize) throws RequestValidationException, RequestExecutionException
+    {
+        int count = 0;
+        while (!pager.isExhausted())
+        {
+            int maxLimit = pager.maxRemaining();
+            ResultSet rset = process(pager.fetchPage(pageSize), variables, maxLimit, pager.timestamp());
+            count += rset.rows.size();
+        }
+
+        ResultSet result = ResultSet.makeCountResult(keyspace(), columnFamily(), count, parameters.countAlias);
+        return new ResultMessage.Rows(result);
+    }
+
+    public ResultMessage.Rows processResults(List<Row> rows, List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
     {
         // Even for count, we need to process the result as it'll group some column together in sparse column families
         ResultSet rset = process(rows, variables, limit, now);
@@ -169,7 +229,7 @@ public class SelectStatement implements CQLStatement
         int limit = getLimit(variables);
         long now = System.currentTimeMillis();
         List<Row> rows = isKeyRange || usesSecondaryIndexing
-                       ? RangeSliceVerbHandler.executeLocally(getRangeCommand(variables, limit, now))
+                       ? getRangeCommand(variables, limit, now).executeLocally()
                        : readLocally(keyspace(), getSliceCommands(variables, limit, now));
 
         return processResults(rows, variables, limit, now);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index 16445f5..a10415a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -54,7 +54,7 @@ public class TruncateStatement extends CFStatement implements CQLStatement
         ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
     }
 
-    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables) throws InvalidRequestException, TruncateException
+    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize) throws InvalidRequestException, TruncateException
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
index 0db80bf..4806314 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
@@ -51,7 +51,7 @@ public class UseStatement extends ParsedStatement implements CQLStatement
     {
     }
 
-    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables) throws InvalidRequestException
+    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize) throws InvalidRequestException
     {
         state.getClientState().setKeyspace(keyspace);
         return new ResultMessage.SetKeyspace(keyspace);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractRangeCommand.java b/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
new file mode 100644
index 0000000..1258344
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.service.IReadCommand;
+import org.apache.cassandra.thrift.IndexExpression;
+
+public abstract class AbstractRangeCommand implements IReadCommand
+{
+    public final String keyspace;
+    public final String columnFamily;
+    public final long timestamp;
+
+    public final AbstractBounds<RowPosition> keyRange;
+    public final IDiskAtomFilter predicate;
+    public final List<IndexExpression> rowFilter;
+
+    public AbstractRangeCommand(String keyspace, String columnFamily, long timestamp, AbstractBounds<RowPosition> keyRange, IDiskAtomFilter predicate, List<IndexExpression> rowFilter)
+    {
+        this.keyspace = keyspace;
+        this.columnFamily = columnFamily;
+        this.timestamp = timestamp;
+        this.keyRange = keyRange;
+        this.predicate = predicate;
+        this.rowFilter = rowFilter;
+    }
+
+    public String getKeyspace()
+    {
+        return keyspace;
+    }
+
+    public abstract MessageOut<? extends AbstractRangeCommand> createMessage();
+    public abstract AbstractRangeCommand forSubRange(AbstractBounds<RowPosition> range);
+    public abstract AbstractRangeCommand withUpdatedLimit(int newLimit);
+
+    public abstract int limit();
+    public abstract boolean countCQL3Rows();
+    public abstract List<Row> executeLocally();
+
+    public long getTimeout()
+    {
+        return DatabaseDescriptor.getRangeRpcTimeout();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 735f627..62a1fdb 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -49,9 +49,11 @@ import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.*;
+import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.db.filter.ExtendedFilter;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -1304,9 +1306,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     ColumnFamily filterColumnFamily(ColumnFamily cached, QueryFilter filter)
     {
         ColumnFamily cf = cached.cloneMeShallow(ArrayBackedSortedColumns.factory, filter.filter.isReversed());
-        OnDiskAtomIterator ci = filter.getMemtableColumnIterator(cached, null);
-        filter.collateOnDiskAtom(cf, ci, gcBefore(filter.timestamp));
-        return removeDeletedCF(cf, gcBefore(filter.timestamp));
+        OnDiskAtomIterator ci = filter.getColumnFamilyIterator(cached);
+
+        int gcBefore = gcBefore(filter.timestamp);
+        filter.collateOnDiskAtom(cf, ci, gcBefore);
+        return removeDeletedCF(cf, gcBefore);
     }
 
     /**
@@ -1466,26 +1470,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     /**
       * Iterate over a range of rows and columns from memtables/sstables.
       *
-      * @param range Either a Bounds, which includes start key, or a Range, which does not.
-      * @param columnFilter description of the columns we're interested in for each row
+      * @param range The range of keys and columns within those keys to fetch
      */
-    private AbstractScanIterator getSequentialIterator(final AbstractBounds<RowPosition> range,
-                                                       IDiskAtomFilter columnFilter,
-                                                       long timestamp)
+    private AbstractScanIterator getSequentialIterator(final DataRange range, long now)
     {
-        assert !(range instanceof Range) || !((Range)range).isWrapAround() || range.right.isMinimum() : range;
-
-        final RowPosition startWith = range.left;
-        final RowPosition stopAt = range.right;
-
-        QueryFilter filter = new QueryFilter(null, name, columnFilter, timestamp);
+        assert !(range.keyRange() instanceof Range) || !((Range)range.keyRange()).isWrapAround() || range.keyRange().right.isMinimum() : range.keyRange();
 
-        final ViewFragment view = markReferenced(range);
-        Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), range.getString(metadata.getKeyValidator()));
+        final ViewFragment view = markReferenced(range.keyRange());
+        Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), range.keyRange().getString(metadata.getKeyValidator()));
 
         try
         {
-            final CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(view.memtables, view.sstables, startWith, stopAt, filter, this);
+            final CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(view.memtables, view.sstables, range, this, now);
 
             // todo this could be pushed into SSTableScanner
             return new AbstractScanIterator()
@@ -1499,7 +1495,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                     Row current = iterator.next();
                     DecoratedKey key = current.key;
 
-                    if (!stopAt.isMinimum() && stopAt.compareTo(key) < 0)
+                    if (!range.stopKey().isMinimum() && range.stopKey().compareTo(key) < 0)
                         return endOfData();
 
                     // skipping outside of assigned range
@@ -1527,43 +1523,108 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
+    @VisibleForTesting
     public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range,
                                    List<IndexExpression> rowFilter,
                                    IDiskAtomFilter columnFilter,
                                    int maxResults)
     {
-        return getRangeSlice(range, rowFilter, columnFilter, maxResults, System.currentTimeMillis(), false, false);
+        return getRangeSlice(range, rowFilter, columnFilter, maxResults, System.currentTimeMillis());
     }
 
     public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range,
                                    List<IndexExpression> rowFilter,
                                    IDiskAtomFilter columnFilter,
                                    int maxResults,
+                                   long now)
+    {
+        return getRangeSlice(makeExtendedFilter(range, columnFilter, rowFilter, maxResults, false, false, now));
+    }
+
+    /**
+     * Allows generic range paging with the slice column filter.
+     * Typically, suppose we have rows A, B, C ... Z having each some columns in [1, 100].
+     * And suppose we want to page throught the query that for all rows returns the columns
+     * within [25, 75]. For that, we need to be able to do a range slice starting at (row r, column c)
+     * and ending at (row Z, column 75), *but* that only return columns in [25, 75].
+     * That is what this method allows. The columnRange is the "window" of  columns we are interested
+     * in each row, and columnStart (resp. columnEnd) is the start (resp. end) for the first
+     * (resp. end) requested row.
+     */
+    public ExtendedFilter makeExtendedFilter(AbstractBounds<RowPosition> keyRange,
+                                             SliceQueryFilter columnRange,
+                                             ByteBuffer columnStart,
+                                             ByteBuffer columnStop,
+                                             List<IndexExpression> rowFilter,
+                                             int maxResults,
+                                             long now)
+    {
+        DataRange dataRange = new DataRange.Paging(keyRange, columnRange, columnStart, columnStop, metadata.comparator);
+        return ExtendedFilter.create(this, dataRange, rowFilter, maxResults, true, now);
+    }
+
+    public List<Row> getRangeSlice(AbstractBounds<RowPosition> range,
+                                   List<IndexExpression> rowFilter,
+                                   IDiskAtomFilter columnFilter,
+                                   int maxResults,
                                    long now,
                                    boolean countCQL3Rows,
                                    boolean isPaging)
     {
-        return filter(getSequentialIterator(range, columnFilter, now),
-                      ExtendedFilter.create(this, rowFilter, columnFilter, maxResults, now, countCQL3Rows, isPaging));
+        return getRangeSlice(makeExtendedFilter(range, columnFilter, rowFilter, maxResults, countCQL3Rows, isPaging, now));
     }
 
+    public ExtendedFilter makeExtendedFilter(AbstractBounds<RowPosition> range,
+                                             IDiskAtomFilter columnFilter,
+                                             List<IndexExpression> rowFilter,
+                                             int maxResults,
+                                             boolean countCQL3Rows,
+                                             boolean isPaging,
+                                             long timestamp)
+    {
+        DataRange dataRange;
+        if (isPaging)
+        {
+            assert columnFilter instanceof SliceQueryFilter;
+            SliceQueryFilter sfilter = (SliceQueryFilter)columnFilter;
+            assert sfilter.slices.length == 1;
+            SliceQueryFilter newFilter = new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, sfilter.isReversed(), sfilter.count);
+            dataRange = new DataRange.Paging(range, newFilter, sfilter.start(), sfilter.finish(), metadata.comparator);
+        }
+        else
+        {
+            dataRange = new DataRange(range, columnFilter);
+        }
+        return ExtendedFilter.create(this, dataRange, rowFilter, maxResults, countCQL3Rows, timestamp);
+    }
+
+    public List<Row> getRangeSlice(ExtendedFilter filter)
+    {
+        return filter(getSequentialIterator(filter.dataRange, filter.timestamp), filter);
+    }
+
+    @VisibleForTesting
     public List<Row> search(AbstractBounds<RowPosition> range,
                             List<IndexExpression> clause,
-                            IDiskAtomFilter columnFilter,
+                            IDiskAtomFilter dataFilter,
                             int maxResults)
     {
-        return search(range, clause, columnFilter, maxResults, System.currentTimeMillis(), false);
+        return search(range, clause, dataFilter, maxResults, System.currentTimeMillis());
     }
 
     public List<Row> search(AbstractBounds<RowPosition> range,
                             List<IndexExpression> clause,
-                            IDiskAtomFilter columnFilter,
+                            IDiskAtomFilter dataFilter,
                             int maxResults,
-                            long now,
-                            boolean countCQL3Rows)
+                            long now)
+    {
+        return search(makeExtendedFilter(range, dataFilter, clause, maxResults, false, false, now));
+    }
+
+    public List<Row> search(ExtendedFilter filter)
     {
-        Tracing.trace("Executing indexed scan for {}", range.getString(metadata.getKeyValidator()));
-        return indexManager.search(range, clause, columnFilter, maxResults, now, countCQL3Rows);
+        Tracing.trace("Executing indexed scan for {}", filter.dataRange.keyRange().getString(metadata.getKeyValidator()));
+        return indexManager.search(filter);
     }
 
     public List<Row> filter(AbstractScanIterator rowIterator, ExtendedFilter filter)
@@ -1584,7 +1645,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
                 if (rowIterator.needsFiltering())
                 {
-                    IDiskAtomFilter extraFilter = filter.getExtraFilter(data);
+                    IDiskAtomFilter extraFilter = filter.getExtraFilter(rawRow.key, data);
                     if (extraFilter != null)
                     {
                         ColumnFamily cf = filter.cfs.getColumnFamily(new QueryFilter(rawRow.key, name, extraFilter, filter.timestamp));
@@ -1594,12 +1655,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
                     removeDroppedColumns(data);
 
-                    if (!filter.isSatisfiedBy(rawRow.key.key, data, null))
+                    if (!filter.isSatisfiedBy(rawRow.key, data, null))
                         continue;
 
                     logger.trace("{} satisfies all filter expressions", data);
                     // cut the resultset back to what was requested, if necessary
-                    data = filter.prune(data);
+                    data = filter.prune(rawRow.key, data);
                 }
                 else
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java
new file mode 100644
index 0000000..d764d60
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.dht.*;
+
+/**
+ * Groups key range and column filter for range queries.
+ *
+ * The main "trick" of this class is that the column filter can only
+ * be obtained by providing the row key on which the column filter will
+ * be applied (which we always know before actually querying the columns).
+ *
+ * This allows the paging DataRange to return a filter for most rows but a
+ * potentially different ones for the starting and stopping key. Could
+ * allow more fancy stuff in the future too, like column filters that
+ * depend on the actual key value :)
+ */
+public class DataRange
+{
+    private final AbstractBounds<RowPosition> keyRange;
+    protected IDiskAtomFilter columnFilter;
+    protected final boolean selectFullRow;
+
+    public DataRange(AbstractBounds<RowPosition> range, IDiskAtomFilter columnFilter)
+    {
+        assert !(range instanceof Range) || !((Range)range).isWrapAround() || range.right.isMinimum() : range;
+
+        this.keyRange = range;
+        this.columnFilter = columnFilter;
+        this.selectFullRow = columnFilter instanceof SliceQueryFilter
+                           ? isFullRowSlice((SliceQueryFilter)columnFilter)
+                           : false;
+    }
+
+    public static boolean isFullRowSlice(SliceQueryFilter filter)
+    {
+        return filter.slices.length == 1
+            && filter.start().remaining() == 0
+            && filter.finish().remaining() == 0
+            && filter.count == Integer.MAX_VALUE;
+    }
+
+    public static DataRange allData(IPartitioner partitioner)
+    {
+        return forKeyRange(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken()));
+    }
+
+    public static DataRange forKeyRange(Range<Token> keyRange)
+    {
+        return new DataRange(keyRange.toRowBounds(), new IdentityQueryFilter());
+    }
+
+    public AbstractBounds<RowPosition> keyRange()
+    {
+        return keyRange;
+    }
+
+    public RowPosition startKey()
+    {
+        return keyRange.left;
+    }
+
+    public RowPosition stopKey()
+    {
+        return keyRange.right;
+    }
+
+    public boolean contains(RowPosition pos)
+    {
+        return keyRange.contains(pos);
+    }
+
+    public int getLiveCount(ColumnFamily data, long now)
+    {
+        return columnFilter instanceof SliceQueryFilter
+             ? ((SliceQueryFilter)columnFilter).lastCounted()
+             : columnFilter.getLiveCount(data, now);
+    }
+
+    public boolean selectsFullRowFor(ByteBuffer rowKey)
+    {
+        return selectFullRow;
+    }
+
+    public IDiskAtomFilter columnFilter(ByteBuffer rowKey)
+    {
+        return columnFilter;
+    }
+
+    public void updateColumnsLimit(int count)
+    {
+        columnFilter.updateColumnsLimit(count);
+    }
+
+    public static class Paging extends DataRange
+    {
+        private final SliceQueryFilter sliceFilter;
+        private final Comparator<ByteBuffer> comparator;
+        private final ByteBuffer columnStart;
+        private final ByteBuffer columnFinish;
+
+        private Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer columnStart, ByteBuffer columnFinish, Comparator<ByteBuffer> comparator)
+        {
+            super(range, filter);
+
+            this.sliceFilter = filter;
+            this.comparator = comparator;
+            this.columnStart = columnStart;
+            this.columnFinish = columnFinish;
+        }
+
+        public Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer columnStart, ByteBuffer columnFinish, AbstractType<?> comparator)
+        {
+            this(range, filter, columnStart, columnFinish, filter.isReversed() ? comparator.reverseComparator : comparator);
+        }
+
+        @Override
+        public boolean selectsFullRowFor(ByteBuffer rowKey)
+        {
+            // If we initial filter is not the full filter, don't bother
+            if (!selectFullRow)
+                return false;
+
+            if (!equals(startKey(), rowKey) && !equals(stopKey(), rowKey))
+                return selectFullRow;
+
+            return isFullRowSlice((SliceQueryFilter)columnFilter(rowKey));
+        }
+
+        private boolean equals(RowPosition pos, ByteBuffer rowKey)
+        {
+            return pos instanceof DecoratedKey && ((DecoratedKey)pos).key.equals(rowKey);
+        }
+
+        @Override
+        public IDiskAtomFilter columnFilter(ByteBuffer rowKey)
+        {
+            /*
+             * We have that ugly hack that for slice queries, when we ask for
+             * the live count, we reach into the query filter to get the last
+             * counter number of columns to avoid recounting.
+             * Maybe we should just remove that hack, but in the meantime, we
+             * need to keep a reference the last returned filter.
+             */
+            columnFilter = equals(startKey(), rowKey) || equals(stopKey(), rowKey)
+                         ? sliceFilter.withUpdatedSlices(slicesForKey(rowKey))
+                         : sliceFilter;
+            return columnFilter;
+        }
+
+        private ColumnSlice[] slicesForKey(ByteBuffer key)
+        {
+            // We don't call that until it's necessary, so assume we have to do some hard work
+            ByteBuffer newStart = equals(startKey(), key) ? columnStart : null;
+            ByteBuffer newFinish = equals(stopKey(), key) ? columnFinish : null;
+
+            List<ColumnSlice> newSlices = new ArrayList<ColumnSlice>(sliceFilter.slices.length); // in the common case, we'll have the same number of slices
+
+            for (ColumnSlice slice : sliceFilter.slices)
+            {
+                if (newStart != null)
+                {
+                    if (slice.isBefore(comparator, newStart))
+                        continue; // we skip that slice
+
+                    if (slice.includes(comparator, newStart))
+                        slice = new ColumnSlice(newStart, slice.finish);
+
+                    // Whether we've updated the slice or not, we don't have to bother about newStart anymore
+                    newStart = null;
+                }
+
+                assert newStart == null;
+                if (newFinish != null && !slice.isBefore(comparator, newFinish))
+                {
+                    if (slice.includes(comparator, newFinish))
+                        newSlices.add(new ColumnSlice(slice.start, newFinish));
+                    // In any case, we're done
+                    break;
+                }
+                newSlices.add(slice);
+            }
+
+            return newSlices.toArray(new ColumnSlice[newSlices.size()]);
+        }
+
+        @Override
+        public void updateColumnsLimit(int count)
+        {
+            columnFilter.updateColumnsLimit(count);
+            sliceFilter.updateColumnsLimit(count);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index e89c769..3a30701 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -480,7 +480,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         RowPosition minPos = p.getMinimumToken().minKeyBound();
         Range<RowPosition> range = new Range<RowPosition>(minPos, minPos, p);
         IDiskAtomFilter filter = new NamesQueryFilter(ImmutableSortedSet.<ByteBuffer>of());
-        List<Row> rows = hintStore.getRangeSlice(range, null, filter, Integer.MAX_VALUE);
+        List<Row> rows = hintStore.getRangeSlice(range, null, filter, Integer.MAX_VALUE, System.currentTimeMillis());
         for (Row row : rows)
         {
             UUID hostId = UUIDGen.getUUID(row.key.key);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index ad6258a..c31c882 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Function;
 import com.google.common.base.Throwables;
-import com.google.common.collect.AbstractIterator;
 
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.StageManager;
@@ -336,52 +335,6 @@ public class Memtable
         return period > 0 && (System.nanoTime() - creationNano >= TimeUnit.MILLISECONDS.toNanos(period));
     }
 
-    /**
-     * obtain an iterator of columns in this memtable in the specified order starting from a given column.
-     */
-    public static OnDiskAtomIterator getSliceIterator(final DecoratedKey key, final ColumnFamily cf, SliceQueryFilter filter)
-    {
-        assert cf != null;
-        final Iterator<Column> filteredIter = filter.reversed ? cf.reverseIterator(filter.slices) : cf.iterator(filter.slices);
-
-        return new OnDiskAtomIterator()
-        {
-            public ColumnFamily getColumnFamily()
-            {
-                return cf;
-            }
-
-            public DecoratedKey getKey()
-            {
-                return key;
-            }
-
-            public boolean hasNext()
-            {
-                return filteredIter.hasNext();
-            }
-
-            public OnDiskAtom next()
-            {
-                return filteredIter.next();
-            }
-
-            public void close() throws IOException { }
-
-            public void remove()
-            {
-                throw new UnsupportedOperationException();
-            }
-        };
-    }
-
-    public static OnDiskAtomIterator getNamesIterator(final DecoratedKey key, final ColumnFamily cf, final NamesQueryFilter filter)
-    {
-        assert cf != null;
-
-        return new ByNameColumnIterator(filter.columns.iterator(), cf, key);
-    }
-
     public ColumnFamily getColumnFamily(DecoratedKey key)
     {
         return rows.get(key);
@@ -392,44 +345,6 @@ public class Memtable
         return creationTime;
     }
 
-    private static class ByNameColumnIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
-    {
-        private final ColumnFamily cf;
-        private final DecoratedKey key;
-        private final Iterator<ByteBuffer> iter;
-
-        public ByNameColumnIterator(Iterator<ByteBuffer> iter, ColumnFamily cf, DecoratedKey key)
-        {
-            this.iter = iter;
-            this.cf = cf;
-            this.key = key;
-        }
-
-        public ColumnFamily getColumnFamily()
-        {
-            return cf;
-        }
-
-        public DecoratedKey getKey()
-        {
-            return key;
-        }
-
-        protected OnDiskAtom computeNext()
-        {
-            while (iter.hasNext())
-            {
-                ByteBuffer current = iter.next();
-                Column column = cf.getColumn(current);
-                if (column != null)
-                    return column;
-            }
-            return endOfData();
-        }
-
-        public void close() throws IOException { }
-    }
-
     class FlushRunnable extends DiskAwareRunnable
     {
         private final CountDownLatch latch;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/PagedRangeCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PagedRangeCommand.java b/src/java/org/apache/cassandra/db/PagedRangeCommand.java
new file mode 100644
index 0000000..0e1fa4f
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/PagedRangeCommand.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.thrift.IndexExpression;
+import org.apache.cassandra.thrift.IndexOperator;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class PagedRangeCommand extends AbstractRangeCommand
+{
+    public static final IVersionedSerializer<PagedRangeCommand> serializer = new Serializer();
+
+    public final ByteBuffer start;
+    public final ByteBuffer stop;
+    public final int limit;
+
+    public PagedRangeCommand(String keyspace,
+                             String columnFamily,
+                             long timestamp,
+                             AbstractBounds<RowPosition> keyRange,
+                             SliceQueryFilter predicate,
+                             ByteBuffer start,
+                             ByteBuffer stop,
+                             List<IndexExpression> rowFilter,
+                             int limit)
+    {
+        super(keyspace, columnFamily, timestamp, keyRange, predicate, rowFilter);
+        this.start = start;
+        this.stop = stop;
+        this.limit = limit;
+    }
+
+    public MessageOut<PagedRangeCommand> createMessage()
+    {
+        return new MessageOut<PagedRangeCommand>(MessagingService.Verb.PAGED_RANGE, this, serializer);
+    }
+
+    public AbstractRangeCommand forSubRange(AbstractBounds<RowPosition> subRange)
+    {
+        ByteBuffer newStart = subRange.left.equals(keyRange.left) ? start : ((SliceQueryFilter)predicate).start();
+        ByteBuffer newStop = subRange.right.equals(keyRange.right) ? stop : ((SliceQueryFilter)predicate).finish();
+        return new PagedRangeCommand(keyspace,
+                                     columnFamily,
+                                     timestamp,
+                                     subRange,
+                                     (SliceQueryFilter)predicate,
+                                     newStart,
+                                     newStop,
+                                     rowFilter,
+                                     limit);
+    }
+
+    public AbstractRangeCommand withUpdatedLimit(int newLimit)
+    {
+        return new PagedRangeCommand(keyspace,
+                                     columnFamily,
+                                     timestamp,
+                                     keyRange,
+                                     (SliceQueryFilter)predicate,
+                                     start,
+                                     stop,
+                                     rowFilter,
+                                     newLimit);
+    }
+
+    public int limit()
+    {
+        return limit;
+    }
+
+    public boolean countCQL3Rows()
+    {
+        return true;
+    }
+
+    public List<Row> executeLocally()
+    {
+        ColumnFamilyStore cfs = Table.open(keyspace).getColumnFamilyStore(columnFamily);
+
+        ExtendedFilter exFilter = cfs.makeExtendedFilter(keyRange, (SliceQueryFilter)predicate, start, stop, rowFilter, limit, timestamp);
+        if (cfs.indexManager.hasIndexFor(rowFilter))
+            return cfs.search(exFilter);
+        else
+            return cfs.getRangeSlice(exFilter);
+    }
+
+    private static class Serializer implements IVersionedSerializer<PagedRangeCommand>
+    {
+        public void serialize(PagedRangeCommand cmd, DataOutput out, int version) throws IOException
+        {
+            out.writeUTF(cmd.keyspace);
+            out.writeUTF(cmd.columnFamily);
+            out.writeLong(cmd.timestamp);
+
+            AbstractBounds.serializer.serialize(cmd.keyRange, out, version);
+
+            // SliceQueryFilter (the count is not used)
+            SliceQueryFilter filter = (SliceQueryFilter)cmd.predicate;
+            SliceQueryFilter.serializer.serialize(filter, out, version);
+
+            // The start and stop of the page
+            ByteBufferUtil.writeWithShortLength(cmd.start, out);
+            ByteBufferUtil.writeWithShortLength(cmd.stop, out);
+
+            out.writeInt(cmd.rowFilter.size());
+            for (IndexExpression expr : cmd.rowFilter)
+            {
+                ByteBufferUtil.writeWithShortLength(expr.column_name, out);
+                out.writeInt(expr.op.getValue());
+                ByteBufferUtil.writeWithLength(expr.value, out);
+            }
+
+            out.writeInt(cmd.limit);
+        }
+
+        public PagedRangeCommand deserialize(DataInput in, int version) throws IOException
+        {
+            String keyspace = in.readUTF();
+            String columnFamily = in.readUTF();
+            long timestamp = in.readLong();
+
+            AbstractBounds<RowPosition> keyRange = AbstractBounds.serializer.deserialize(in, version).toRowBounds();
+
+            SliceQueryFilter predicate = SliceQueryFilter.serializer.deserialize(in, version);
+
+            ByteBuffer start = ByteBufferUtil.readWithShortLength(in);
+            ByteBuffer stop = ByteBufferUtil.readWithShortLength(in);
+
+            int filterCount = in.readInt();
+            List<IndexExpression> rowFilter = new ArrayList<IndexExpression>(filterCount);
+            for (int i = 0; i < filterCount; i++)
+            {
+                IndexExpression expr = new IndexExpression(ByteBufferUtil.readWithShortLength(in),
+                                                           IndexOperator.findByValue(in.readInt()),
+                                                           ByteBufferUtil.readWithShortLength(in));
+                rowFilter.add(expr);
+            }
+
+            int limit = in.readInt();
+            return new PagedRangeCommand(keyspace, columnFamily, timestamp, keyRange, predicate, start, stop, rowFilter, limit);
+        }
+
+        public long serializedSize(PagedRangeCommand cmd, int version)
+        {
+            long size = 0;
+
+            size += TypeSizes.NATIVE.sizeof(cmd.keyspace);
+            size += TypeSizes.NATIVE.sizeof(cmd.columnFamily);
+            size += TypeSizes.NATIVE.sizeof(cmd.timestamp);
+
+            size += AbstractBounds.serializer.serializedSize(cmd.keyRange, version);
+
+            size += SliceQueryFilter.serializer.serializedSize((SliceQueryFilter)cmd.predicate, version);
+
+            size += TypeSizes.NATIVE.sizeof(cmd.rowFilter.size());
+            for (IndexExpression expr : cmd.rowFilter)
+            {
+                size += TypeSizes.NATIVE.sizeofWithShortLength(expr.column_name);
+                size += TypeSizes.NATIVE.sizeof(expr.op.getValue());
+                size += TypeSizes.NATIVE.sizeofWithLength(expr.value);
+            }
+
+            size += TypeSizes.NATIVE.sizeof(cmd.limit);
+            return size;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index c7e71f3..c037518 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -23,10 +23,11 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.filter.ExtendedFilter;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CompositeType;
@@ -34,25 +35,15 @@ import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.IReadCommand;
+import org.apache.cassandra.service.pager.Pageable;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-public class RangeSliceCommand implements IReadCommand
+public class RangeSliceCommand extends AbstractRangeCommand implements Pageable
 {
     public static final RangeSliceCommandSerializer serializer = new RangeSliceCommandSerializer();
 
-    public final String keyspace;
-
-    public final String column_family;
-
-    public final long timestamp;
-
-    public final IDiskAtomFilter predicate;
-    public final List<IndexExpression> row_filter;
-
-    public final AbstractBounds<RowPosition> range;
     public final int maxResults;
     public final boolean countCQL3Rows;
     public final boolean isPaging;
@@ -88,12 +79,7 @@ public class RangeSliceCommand implements IReadCommand
                              boolean countCQL3Rows,
                              boolean isPaging)
     {
-        this.keyspace = keyspace;
-        this.column_family = column_family;
-        this.timestamp = timestamp;
-        this.predicate = predicate;
-        this.range = range;
-        this.row_filter = row_filter;
+        super(keyspace, column_family, timestamp, range, predicate, row_filter);
         this.maxResults = maxResults;
         this.countCQL3Rows = countCQL3Rows;
         this.isPaging = isPaging;
@@ -104,29 +90,66 @@ public class RangeSliceCommand implements IReadCommand
         return new MessageOut<RangeSliceCommand>(MessagingService.Verb.RANGE_SLICE, this, serializer);
     }
 
+    public AbstractRangeCommand forSubRange(AbstractBounds<RowPosition> subRange)
+    {
+        return new RangeSliceCommand(keyspace,
+                                     columnFamily,
+                                     timestamp,
+                                     predicate,
+                                     subRange,
+                                     rowFilter,
+                                     maxResults,
+                                     countCQL3Rows,
+                                     isPaging);
+    }
+
+    public AbstractRangeCommand withUpdatedLimit(int newLimit)
+    {
+        return new RangeSliceCommand(keyspace,
+                                     columnFamily,
+                                     timestamp,
+                                     predicate,
+                                     keyRange,
+                                     rowFilter,
+                                     newLimit,
+                                     countCQL3Rows,
+                                     isPaging);
+    }
+
+    public int limit()
+    {
+        return maxResults;
+    }
+
+    public boolean countCQL3Rows()
+    {
+        return countCQL3Rows;
+    }
+
+    public List<Row> executeLocally()
+    {
+        ColumnFamilyStore cfs = Table.open(keyspace).getColumnFamilyStore(columnFamily);
+
+        ExtendedFilter exFilter = cfs.makeExtendedFilter(keyRange, predicate, rowFilter, maxResults, countCQL3Rows, isPaging, timestamp);
+        if (cfs.indexManager.hasIndexFor(rowFilter))
+            return cfs.search(exFilter);
+        else
+            return cfs.getRangeSlice(exFilter);
+    }
+
     @Override
     public String toString()
     {
         return "RangeSliceCommand{" +
                "keyspace='" + keyspace + '\'' +
-               ", column_family='" + column_family + '\'' +
-               ", timestamp='" + timestamp + '\'' +
+               ", columnFamily='" + columnFamily + '\'' +
+               ", timestamp=" + timestamp +
                ", predicate=" + predicate +
-               ", range=" + range +
-               ", row_filter =" + row_filter +
+               ", range=" + keyRange +
+               ", rowFilter =" + rowFilter +
                ", maxResults=" + maxResults +
                ", countCQL3Rows=" + countCQL3Rows +
-               '}';
-    }
-
-    public String getKeyspace()
-    {
-        return keyspace;
-    }
-
-    public long getTimeout()
-    {
-        return DatabaseDescriptor.getRangeRpcTimeout();
+               "}";
     }
 }
 
@@ -135,7 +158,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
     public void serialize(RangeSliceCommand sliceCommand, DataOutput out, int version) throws IOException
     {
         out.writeUTF(sliceCommand.keyspace);
-        out.writeUTF(sliceCommand.column_family);
+        out.writeUTF(sliceCommand.columnFamily);
 
         if (version >= MessagingService.VERSION_20)
             out.writeLong(sliceCommand.timestamp);
@@ -147,7 +170,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
             // must extract the super column name from the predicate (and
             // modify the predicate accordingly)
             ByteBuffer sc = null;
-            CFMetaData metadata = Schema.instance.getCFMetaData(sliceCommand.getKeyspace(), sliceCommand.column_family);
+            CFMetaData metadata = Schema.instance.getCFMetaData(sliceCommand.getKeyspace(), sliceCommand.columnFamily);
             if (metadata.cfType == ColumnFamilyType.Super)
             {
                 SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, filter);
@@ -162,21 +185,21 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
 
         IDiskAtomFilter.Serializer.instance.serialize(filter, out, version);
 
-        if (sliceCommand.row_filter == null)
+        if (sliceCommand.rowFilter == null)
         {
             out.writeInt(0);
         }
         else
         {
-            out.writeInt(sliceCommand.row_filter.size());
-            for (IndexExpression expr : sliceCommand.row_filter)
+            out.writeInt(sliceCommand.rowFilter.size());
+            for (IndexExpression expr : sliceCommand.rowFilter)
             {
                 ByteBufferUtil.writeWithShortLength(expr.column_name, out);
                 out.writeInt(expr.op.getValue());
                 ByteBufferUtil.writeWithShortLength(expr.value, out);
             }
         }
-        AbstractBounds.serializer.serialize(sliceCommand.range, out, version);
+        AbstractBounds.serializer.serialize(sliceCommand.keyRange, out, version);
         out.writeInt(sliceCommand.maxResults);
         out.writeBoolean(sliceCommand.countCQL3Rows);
         out.writeBoolean(sliceCommand.isPaging);
@@ -246,7 +269,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
     public long serializedSize(RangeSliceCommand rsc, int version)
     {
         long size = TypeSizes.NATIVE.sizeof(rsc.keyspace);
-        size += TypeSizes.NATIVE.sizeof(rsc.column_family);
+        size += TypeSizes.NATIVE.sizeof(rsc.columnFamily);
 
         if (version >= MessagingService.VERSION_20)
             size += TypeSizes.NATIVE.sizeof(rsc.timestamp);
@@ -255,7 +278,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
         if (version < MessagingService.VERSION_20)
         {
             ByteBuffer sc = null;
-            CFMetaData metadata = Schema.instance.getCFMetaData(rsc.keyspace, rsc.column_family);
+            CFMetaData metadata = Schema.instance.getCFMetaData(rsc.keyspace, rsc.columnFamily);
             if (metadata.cfType == ColumnFamilyType.Super)
             {
                 SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, filter);
@@ -276,21 +299,21 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
 
         size += IDiskAtomFilter.Serializer.instance.serializedSize(filter, version);
 
-        if (rsc.row_filter == null)
+        if (rsc.rowFilter == null)
         {
             size += TypeSizes.NATIVE.sizeof(0);
         }
         else
         {
-            size += TypeSizes.NATIVE.sizeof(rsc.row_filter.size());
-            for (IndexExpression expr : rsc.row_filter)
+            size += TypeSizes.NATIVE.sizeof(rsc.rowFilter.size());
+            for (IndexExpression expr : rsc.rowFilter)
             {
                 size += TypeSizes.NATIVE.sizeofWithShortLength(expr.column_name);
                 size += TypeSizes.NATIVE.sizeof(expr.op.getValue());
                 size += TypeSizes.NATIVE.sizeofWithLength(expr.value);
             }
         }
-        size += AbstractBounds.serializer.serializedSize(rsc.range, version);
+        size += AbstractBounds.serializer.serializedSize(rsc.keyRange, version);
         size += TypeSizes.NATIVE.sizeof(rsc.maxResults);
         size += TypeSizes.NATIVE.sizeof(rsc.countCQL3Rows);
         size += TypeSizes.NATIVE.sizeof(rsc.isPaging);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 61a2478..3031da8 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -34,8 +34,9 @@ import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.IReadCommand;
 import org.apache.cassandra.service.RowDataResolver;
+import org.apache.cassandra.service.pager.Pageable;
 
-public abstract class ReadCommand implements IReadCommand
+public abstract class ReadCommand implements IReadCommand, Pageable
 {
     public enum Type
     {