You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/06/25 10:32:42 UTC

[4/4] git commit: Add auto paging capability to the native protocol

Add auto paging capability to the native protocol

This also generalize the paging used internally, and pages CQL3 'select
count' operation to avoid OOM.

patch by slebresne; reviewed by iamaleksey for CASSANDRA-4415


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e48ff293
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e48ff293
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e48ff293

Branch: refs/heads/trunk
Commit: e48ff29387547c0837ab381f6e890f8417a1b65c
Parents: 40bc445
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue May 28 10:17:26 2013 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Jun 25 10:30:12 2013 +0200

----------------------------------------------------------------------
 doc/native_protocol_v2.spec                     |  92 +++++-
 src/java/org/apache/cassandra/auth/Auth.java    |   3 +-
 .../cassandra/auth/CassandraAuthorizer.java     |   3 +-
 .../cassandra/auth/PasswordAuthenticator.java   |   3 +-
 .../org/apache/cassandra/cql3/CQLStatement.java |   5 +-
 .../apache/cassandra/cql3/QueryProcessor.java   |  14 +-
 .../org/apache/cassandra/cql3/ResultSet.java    |  25 +-
 .../statements/AuthenticationStatement.java     |   2 +-
 .../cql3/statements/AuthorizationStatement.java |   2 +-
 .../cql3/statements/BatchStatement.java         |   2 +-
 .../cql3/statements/ModificationStatement.java  |   2 +-
 .../statements/SchemaAlteringStatement.java     |   2 +-
 .../cql3/statements/SelectStatement.java        |  72 ++++-
 .../cql3/statements/TruncateStatement.java      |   2 +-
 .../cassandra/cql3/statements/UseStatement.java |   2 +-
 .../cassandra/db/AbstractRangeCommand.java      |  67 +++++
 .../apache/cassandra/db/ColumnFamilyStore.java  | 123 ++++++--
 src/java/org/apache/cassandra/db/DataRange.java | 218 ++++++++++++++
 .../cassandra/db/HintedHandOffManager.java      |   2 +-
 src/java/org/apache/cassandra/db/Memtable.java  |  85 ------
 .../apache/cassandra/db/PagedRangeCommand.java  | 196 +++++++++++++
 .../apache/cassandra/db/RangeSliceCommand.java  | 115 +++++---
 .../org/apache/cassandra/db/ReadCommand.java    |   3 +-
 .../apache/cassandra/db/RowIteratorFactory.java |  30 +-
 .../cassandra/db/SliceFromReadCommand.java      |   5 +
 .../apache/cassandra/db/SliceQueryPager.java    |  88 ------
 .../org/apache/cassandra/db/SystemTable.java    |   3 +-
 src/java/org/apache/cassandra/db/Table.java     |   4 +-
 .../cassandra/db/filter/ColumnCounter.java      |  10 +
 .../apache/cassandra/db/filter/ColumnSlice.java |   6 +
 .../cassandra/db/filter/ExtendedFilter.java     | 237 +++++++---------
 .../cassandra/db/filter/IDiskAtomFilter.java    |   5 +-
 .../cassandra/db/filter/NamesQueryFilter.java   |  58 +++-
 .../apache/cassandra/db/filter/QueryFilter.java |  20 +-
 .../cassandra/db/filter/SliceQueryFilter.java   |  77 ++++-
 .../db/index/SecondaryIndexManager.java         |  12 +-
 .../db/index/SecondaryIndexSearcher.java        |  10 +-
 .../db/index/composites/CompositesSearcher.java |  30 +-
 .../cassandra/db/index/keys/KeysSearcher.java   |  20 +-
 .../cassandra/io/sstable/SSTableReader.java     |  16 +-
 .../cassandra/io/sstable/SSTableScanner.java    |  52 ++--
 .../apache/cassandra/net/MessagingService.java  |   5 +
 .../apache/cassandra/service/QueryState.java    |  64 ++++-
 .../service/RangeSliceVerbHandler.java          |  31 +-
 .../apache/cassandra/service/StorageProxy.java  |  49 +---
 .../service/pager/AbstractQueryPager.java       | 245 ++++++++++++++++
 .../service/pager/MultiPartitionPager.java      | 114 ++++++++
 .../service/pager/NamesQueryPager.java          |  94 ++++++
 .../cassandra/service/pager/Pageable.java       |  38 +++
 .../cassandra/service/pager/QueryPager.java     |  75 +++++
 .../cassandra/service/pager/QueryPagers.java    | 184 ++++++++++++
 .../service/pager/RangeNamesQueryPager.java     |  90 ++++++
 .../service/pager/RangeSliceQueryPager.java     |  99 +++++++
 .../service/pager/SinglePartitionPager.java     |  30 ++
 .../service/pager/SliceQueryPager.java          |  72 +++++
 .../cassandra/thrift/CassandraServer.java       | 142 ++++------
 .../cassandra/thrift/ThriftConversion.java      |   4 +-
 .../org/apache/cassandra/transport/CBCodec.java |   2 +-
 .../org/apache/cassandra/transport/Client.java  |  44 ++-
 .../org/apache/cassandra/transport/Message.java |   7 +-
 .../cassandra/transport/ServerConnection.java   |  18 +-
 .../cassandra/transport/SimpleClient.java       |   4 +-
 .../transport/messages/AuthChallenge.java       |   4 +-
 .../transport/messages/AuthResponse.java        |   4 +-
 .../transport/messages/AuthSuccess.java         |   4 +-
 .../transport/messages/AuthenticateMessage.java |   4 +-
 .../transport/messages/BatchMessage.java        |   4 +-
 .../transport/messages/CredentialsMessage.java  |   4 +-
 .../transport/messages/ErrorMessage.java        |   4 +-
 .../transport/messages/EventMessage.java        |   4 +-
 .../transport/messages/ExecuteMessage.java      |  34 ++-
 .../transport/messages/NextMessage.java         | 118 ++++++++
 .../transport/messages/OptionsMessage.java      |   4 +-
 .../transport/messages/PrepareMessage.java      |   4 +-
 .../transport/messages/QueryMessage.java        |  36 ++-
 .../transport/messages/ReadyMessage.java        |   4 +-
 .../transport/messages/RegisterMessage.java     |   4 +-
 .../transport/messages/ResultMessage.java       |  28 +-
 .../transport/messages/StartupMessage.java      |   4 +-
 .../transport/messages/SupportedMessage.java    |   4 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     | 113 +++++---
 .../db/compaction/CompactionsTest.java          |   4 +-
 .../cassandra/db/compaction/TTLExpiryTest.java  |   4 +-
 .../cassandra/service/QueryPagerTest.java       | 283 +++++++++++++++++++
 84 files changed, 2998 insertions(+), 812 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/doc/native_protocol_v2.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v2.spec b/doc/native_protocol_v2.spec
index 3959a15..2cc771d 100644
--- a/doc/native_protocol_v2.spec
+++ b/doc/native_protocol_v2.spec
@@ -22,6 +22,7 @@ Table of Contents
       4.1.6. EXECUTE
       4.1.7. BATCH
       4.1.8. REGISTER
+      4.1.9. NEXT
     4.2. Responses
       4.2.1. ERROR
       4.2.2. READY
@@ -38,8 +39,9 @@ Table of Contents
       4.2.8. AUTH_SUCCESS
   5. Compression
   6. Collection types
-  7. Error codes
-  8. Changes from v1
+  7. Result paging
+  8. Error codes
+  9. Changes from v1
 
 
 1. Overview
@@ -98,7 +100,7 @@ Table of Contents
     0x82    Response frame for this protocol version
 
   This document describe the version 2 of the protocol. For the changes made since
-  version 1, see Section 8.
+  version 1, see Section 9.
 
 
 2.2. flags
@@ -165,6 +167,7 @@ Table of Contents
     0x0E    AUTH_CHALLENGE
     0x0F    AUTH_RESPONSE
     0x10    AUTH_SUCCESS
+    0x11    NEXT
 
   Messages are described in Section 4.
 
@@ -276,10 +279,13 @@ Table of Contents
 4.1.4. QUERY
 
   Performs a CQL query. The body of the message must be:
-    <query><consistency>[<n><value_1>...<value_n>]
+    <query><consistency><result_page_size>[<n><value_1>...<value_n>]
   where:
     - <query> the query, [long string].
     - <consistency> is the [consistency] level for the operation.
+    - <result_page_size> is an [int] controlling the desired page size of the
+      result (in CQL3 rows). A negative value disable paging of the result. See the
+      section on paging (Section 7) for more details.
     - optional: <n> [short], the number of following values.
     - optional: <value_1>...<value_n> are [bytes] to use for bound variables in the query.
 
@@ -302,7 +308,7 @@ Table of Contents
 4.1.6. EXECUTE
 
   Executes a prepared query. The body of the message must be:
-    <id><n><value_1>....<value_n><consistency>
+    <id><n><value_1>....<value_n><consistency><result_page_size>
   where:
     - <id> is the prepared query ID. It's the [short bytes] returned as a
       response to a PREPARE message.
@@ -310,6 +316,9 @@ Table of Contents
     - <value_1>...<value_n> are the [bytes] to use for bound variables in the
       prepared query.
     - <consistency> is the [consistency] level for the operation.
+    - <result_page_size> is an [int] controlling the desired page size of the
+      result (in CQL3 rows). A negative value disable paging of the result. See the
+      section on paging (Section 7) for more details.
 
   Note that the consistency is ignored by some (prepared) queries (USE, CREATE,
   ALTER, TRUNCATE, ...).
@@ -360,6 +369,17 @@ Table of Contents
   multiple times the same event messages, wasting bandwidth.
 
 
+4.1.9. NEXT
+
+  Request the next page of result if paging was requested by a QUERY or EXECUTE
+  statement and there is more result to fetch (see Section 7 for more details).
+  The body of a NEXT message is a single [int] indicating the number of maximum
+  rows to return with the next page of results (it is equivalent to the
+  <result_page_size> in a QUERY or EXECUTE message).
+
+  The result to a NEXT message will be a RESULT message.
+
+
 4.2. Responses
 
   This section describes the content of the frame body for the different
@@ -372,7 +392,7 @@ Table of Contents
   Indicates an error processing a request. The body of the message will be an
   error code ([int]) followed by a [string] error message. Then, depending on
   the exception, more content may follow. The error codes are defined in
-  Section 7, along with their additional content if any.
+  Section 8, along with their additional content if any.
 
 
 4.2.2. READY
@@ -450,6 +470,12 @@ Table of Contents
             0x0001    Global_tables_spec: if set, only one table spec (keyspace
                       and table name) is provided as <global_table_spec>. If not
                       set, <global_table_spec> is not present.
+            0x0002    Has_more_pages: indicates whether this is not the last
+                      page of results and more should be retrieve using a NEXT
+                      message. If not set, this is the laste "page" of result
+                      and NEXT cannot and should not be used. If no result
+                      paging has been requested in the QUERY/EXECUTE/BATCH
+                      message, this will never be set.
         - <columns_count> is an [int] representing the number of columns selected
           by the query this result is of. It defines the number of <col_spec_i>
           elements in and the number of element for each row in <rows_content>.
@@ -623,7 +649,52 @@ Table of Contents
           value.
 
 
-7. Error codes
+7. Result paging
+
+  The protocol allows for paging the result of queries. For that, the QUERY and
+  EXECUTE messages have a <result_page_size> value that indicate the desired
+  page size in CQL3 rows.
+
+  If a positive value is provided for <result_page_size>, the result set of the
+  RESULT message returned for the query will contain at most the
+  <result_page_size> first rows of the query result. If that first page of result
+  contains the full result set for the query, the RESULT message (of kind `Rows`)
+  will have the Has_more_pages flag *not* set. However, if some results are not
+  part of the first response, the Has_more_pages flag will be set. In that latter
+  case, more rows of the result can be retrieved by sending a NEXT message *with the
+  same stream id than the initial query*. The NEXT message also contains its own
+  <result_page_size> that control how many of the remaining result rows will be
+  sent in response. If the response to this NEXT message still does not contains
+  the full remainder of the query result set (the Has_more_pages is set once more),
+  another NEXT message can be send for more result, etc...
+
+  If a RESULT message has the Has_more_pages flag set and any other message than
+  a NEXT message is send on the same stream id, the query is cancelled and no more
+  of its result can be retrieved.
+
+  Only CQL3 queries that return a result set (RESULT message with a Rows `kind`)
+  support paging. For other type of queries, the <result_page_size> value is
+  ignored.
+
+  The <result_page_size> can be set to a negative value to disable paging (in
+  which case the whole result set will be retuned in the first RESULT message,
+  message that will not have the Has_more_pages flag set). The
+  <result_page_size> value cannot be 0.
+
+  Note to client implementors:
+  - While <result_page_size> can be as low as 1, it will likely be detrimental
+    to performance to pick a value too low. A value below 100 is probably too
+    low for most use cases.
+  - Clients should not rely on the actual size of the result set returned to
+    decide if a NEXT message should be issued. Instead, they should always
+    check the Has_more_pages flag (unless they did not enabled paging for the query
+    obviously). Clients should also not assert that no result will have more than
+    <result_page_size> results. While the current implementation always respect
+    the exact value of <result_page_size>, we reserve ourselves the right to return
+    slightly smaller or bigger pages in the future for performance reasons.
+
+
+8. Error codes
 
   The supported error codes are described below:
     0x0000    Server error: something unexpected happened. This indicates a
@@ -715,7 +786,7 @@ Table of Contents
               this host. The rest of the ERROR message body will be [short
               bytes] representing the unknown ID.
 
-8. Changes from v1
+9. Changes from v1
   * Protocol is versioned to allow old client connects to a newer server, if a
     newer client connects to an older server, it needs to check if it gets a
     ProtocolException on connection and try connecting with a lower version.
@@ -727,3 +798,8 @@ Table of Contents
     removed and replaced by a server/client challenges/responses exchanges (done
     through the new AUTH_RESPONSE/AUTH_CHALLENGE messages). See Section 4.2.3 for
     details.
+  * Query paging has been added (Section 7): QUERY and EXECUTE message have an
+    additional <result_page_size> [int], a new NEXT message has been added and
+    the Rows kind of RESULT message has an additional flag. Note that paging is
+    optional, and a client that don't want to handle it can always pass -1 for
+    the <result_page_size> in QUERY and EXECUTE.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/auth/Auth.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/Auth.java b/src/java/org/apache/cassandra/auth/Auth.java
index c561aab..559e10c 100644
--- a/src/java/org/apache/cassandra/auth/Auth.java
+++ b/src/java/org/apache/cassandra/auth/Auth.java
@@ -232,7 +232,8 @@ public class Auth
         {
             ResultMessage.Rows rows = selectUserStatement.execute(consistencyForUser(username),
                                                                   new QueryState(new ClientState(true)),
-                                                                  Lists.newArrayList(ByteBufferUtil.bytes(username)));
+                                                                  Lists.newArrayList(ByteBufferUtil.bytes(username)),
+                                                                  -1);
             return new UntypedResultSet(rows.result);
         }
         catch (RequestValidationException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
index 396be71..6f490f8 100644
--- a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
+++ b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
@@ -74,7 +74,8 @@ public class CassandraAuthorizer implements IAuthorizer
             ResultMessage.Rows rows = authorizeStatement.execute(ConsistencyLevel.ONE,
                                                                  new QueryState(new ClientState(true)),
                                                                  Lists.newArrayList(ByteBufferUtil.bytes(user.getName()),
-                                                                                    ByteBufferUtil.bytes(resource.getName())));
+                                                                                    ByteBufferUtil.bytes(resource.getName())),
+                                                                 -1);
             result = new UntypedResultSet(rows.result);
         }
         catch (RequestValidationException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
index 12dbdee..4d37b7e 100644
--- a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
+++ b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
@@ -108,7 +108,8 @@ public class PasswordAuthenticator implements ISaslAwareAuthenticator
         {
             ResultMessage.Rows rows = authenticateStatement.execute(consistencyForUser(username),
                                                                     new QueryState(new ClientState(true)),
-                                                                    Lists.newArrayList(ByteBufferUtil.bytes(username)));
+                                                                    Lists.newArrayList(ByteBufferUtil.bytes(username)),
+                                                                    -1);
             result = new UntypedResultSet(rows.result);
         }
         catch (RequestValidationException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/CQLStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQLStatement.java b/src/java/org/apache/cassandra/cql3/CQLStatement.java
index 63f9cc6..a4abaf1 100644
--- a/src/java/org/apache/cassandra/cql3/CQLStatement.java
+++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java
@@ -51,11 +51,14 @@ public interface CQLStatement
     /**
      * Execute the statement and return the resulting result or null if there is no result.
      *
+     * @param cl the consistency level for the query
      * @param state the current query state
      * @param variables the values for bounded variables. The implementation
      * can assume that each bound term have a corresponding value.
+     * @param pageSize the initial page size for the result set potentially returned. A negative value
+     * means no paging needs to be done. Statements that do not return result sets can ignore this value.
      */
-    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables) throws RequestValidationException, RequestExecutionException;
+    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize) throws RequestValidationException, RequestExecutionException;
 
     /**
      * Variante of execute used for internal query against the system tables, and thus only query the local node.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 1b89fe3..1de985b 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -106,30 +106,30 @@ public class QueryProcessor
         }
     }
 
-    private static ResultMessage processStatement(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
+    private static ResultMessage processStatement(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables, int pageSize)
     throws RequestExecutionException, RequestValidationException
     {
         logger.trace("Process {} @CL.{}", statement, cl);
         ClientState clientState = queryState.getClientState();
         statement.checkAccess(clientState);
         statement.validate(clientState);
-        ResultMessage result = statement.execute(cl, queryState, variables);
+        ResultMessage result = statement.execute(cl, queryState, variables, pageSize);
         return result == null ? new ResultMessage.Void() : result;
     }
 
     public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState)
     throws RequestExecutionException, RequestValidationException
     {
-        return process(queryString, Collections.<ByteBuffer>emptyList(), cl, queryState);
+        return process(queryString, Collections.<ByteBuffer>emptyList(), cl, queryState, -1);
     }
 
-    public static ResultMessage process(String queryString, List<ByteBuffer> variables, ConsistencyLevel cl, QueryState queryState)
+    public static ResultMessage process(String queryString, List<ByteBuffer> variables, ConsistencyLevel cl, QueryState queryState, int pageSize)
     throws RequestExecutionException, RequestValidationException
     {
         CQLStatement prepared = getStatement(queryString, queryState.getClientState()).statement;
         if (prepared.getBoundsTerms() != variables.size())
             throw new InvalidRequestException("Invalid amount of bind variables");
-        return processStatement(prepared, cl, queryState, variables);
+        return processStatement(prepared, cl, queryState, variables, pageSize);
     }
 
     public static CQLStatement parseStatement(String queryStr, QueryState queryState) throws RequestValidationException
@@ -228,7 +228,7 @@ public class QueryProcessor
         }
     }
 
-    public static ResultMessage processPrepared(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
+    public static ResultMessage processPrepared(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables, int pageSize)
     throws RequestExecutionException, RequestValidationException
     {
         // Check to see if there are any bound variables to verify
@@ -246,7 +246,7 @@ public class QueryProcessor
                     logger.trace("[{}] '{}'", i+1, variables.get(i));
         }
 
-        return processStatement(statement, cl, queryState, variables);
+        return processStatement(statement, cl, queryState, variables, pageSize);
     }
 
     public static ResultMessage processBatch(BatchStatement batch, ConsistencyLevel cl, QueryState queryState, List<List<ByteBuffer>> variables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java
index 451efb2..df892b7 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -97,7 +97,11 @@ public class ResultSet
         String ksName = metadata.names.get(0).ksName;
         String cfName = metadata.names.get(0).cfName;
         long count = rows.size();
+        return makeCountResult(ksName, cfName, count, alias);
+    }
 
+    public static ResultSet makeCountResult(String ksName, String cfName, long count, ColumnIdentifier alias)
+    {
         ColumnSpecification spec = new ColumnSpecification(ksName, cfName, alias == null ? COUNT_COLUMN : alias, LongType.instance);
         Metadata newMetadata = new Metadata(Collections.singletonList(spec));
         List<List<ByteBuffer>> newRows = Collections.singletonList(Collections.singletonList(ByteBufferUtil.bytes(count)));
@@ -190,10 +194,10 @@ public class ResultSet
             return rs;
         }
 
-        public ChannelBuffer encode(ResultSet rs)
+        public ChannelBuffer encode(ResultSet rs, int version)
         {
             CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(2, 0, rs.metadata.names.size() * rs.rows.size());
-            builder.add(Metadata.codec.encode(rs.metadata));
+            builder.add(Metadata.codec.encode(rs.metadata, version));
             builder.add(CBUtil.intToCB(rs.rows.size()));
 
             for (List<ByteBuffer> row : rs.rows)
@@ -241,6 +245,11 @@ public class ResultSet
             return true;
         }
 
+        public void setHasMorePages()
+        {
+            flags.add(Flag.HAS_MORE_PAGES);
+        }
+
         @Override
         public String toString()
         {
@@ -251,6 +260,8 @@ public class ResultSet
                 sb.append("(").append(name.ksName).append(", ").append(name.cfName).append(")");
                 sb.append(", ").append(name.type).append("]");
             }
+            if (flags.contains(Flag.HAS_MORE_PAGES))
+                sb.append(" (to be continued)");
             return sb.toString();
         }
 
@@ -286,7 +297,7 @@ public class ResultSet
                 return new Metadata(flags, names);
             }
 
-            public ChannelBuffer encode(Metadata m)
+            public ChannelBuffer encode(Metadata m, int version)
             {
                 boolean globalTablesSpec = m.flags.contains(Flag.GLOBAL_TABLES_SPEC);
 
@@ -294,6 +305,9 @@ public class ResultSet
                 CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(1 + m.names.size(), stringCount, 0);
 
                 ChannelBuffer header = ChannelBuffers.buffer(8);
+
+                assert version > 1 || !m.flags.contains(Flag.HAS_MORE_PAGES);
+
                 header.writeInt(Flag.serialize(m.flags));
                 header.writeInt(m.names.size());
                 builder.add(header);
@@ -322,13 +336,14 @@ public class ResultSet
     public static enum Flag
     {
         // The order of that enum matters!!
-        GLOBAL_TABLES_SPEC;
+        GLOBAL_TABLES_SPEC,
+        HAS_MORE_PAGES;
 
         public static EnumSet<Flag> deserialize(int flags)
         {
             EnumSet<Flag> set = EnumSet.noneOf(Flag.class);
             Flag[] values = Flag.values();
-            for (int n = 0; n < 32; n++)
+            for (int n = 0; n < values.length; n++)
             {
                 if ((flags & (1 << n)) != 0)
                     set.add(values[n]);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java b/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
index 64468af..97d7be5 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
@@ -40,7 +40,7 @@ public abstract class AuthenticationStatement extends ParsedStatement implements
         return 0;
     }
 
-    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables)
+    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize)
     throws RequestExecutionException, RequestValidationException
     {
         return execute(state.getClientState());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
index af1bd17..5e317aa 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
@@ -41,7 +41,7 @@ public abstract class AuthorizationStatement extends ParsedStatement implements
         return 0;
     }
 
-    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables)
+    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize)
     throws RequestValidationException, RequestExecutionException
     {
         return execute(state.getClientState());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 1436811..6fbab72 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -133,7 +133,7 @@ public class BatchStatement implements CQLStatement
         }
     }
 
-    public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException
+    public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables, int pageSize) throws RequestExecutionException, RequestValidationException
     {
         if (cl == null)
             throw new InvalidRequestException("Invalid empty consistency level");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 62bd976..85728bc 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -332,7 +332,7 @@ public abstract class ModificationStatement implements CQLStatement
         return ifNotExists || (columnConditions != null && !columnConditions.isEmpty());
     }
 
-    public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
+    public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables, int pageSize)
     throws RequestExecutionException, RequestValidationException
     {
         if (cl == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
index 4d40e99..1c5f051 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
@@ -68,7 +68,7 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL
     public void validate(ClientState state) throws RequestValidationException
     {}
 
-    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables) throws RequestValidationException
+    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize) throws RequestValidationException
     {
         announceMigration();
         String tableName = cfName == null || columnFamily() == null ? "" : columnFamily();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 2be85c9..fac9c27 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -39,6 +39,9 @@ import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.RangeSliceVerbHandler;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.pager.Pageable;
+import org.apache.cassandra.service.pager.QueryPager;
+import org.apache.cassandra.service.pager.QueryPagers;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
@@ -54,6 +57,8 @@ import org.apache.cassandra.utils.Pair;
  */
 public class SelectStatement implements CQLStatement
 {
+    private static final int DEFAULT_COUNT_PAGE_SIZE = 10000;
+
     private final int boundTerms;
     public final CFDefinition cfDef;
     public final Parameters parameters;
@@ -130,23 +135,78 @@ public class SelectStatement implements CQLStatement
         // Nothing to do, all validation has been done by RawStatement.prepare()
     }
 
-    public ResultMessage.Rows execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException
+    public ResultMessage.Rows execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize) throws RequestExecutionException, RequestValidationException
     {
         if (cl == null)
             throw new InvalidRequestException("Invalid empty consistency level");
 
         cl.validateForRead(keyspace());
 
+
         int limit = getLimit(variables);
         long now = System.currentTimeMillis();
-        List<Row> rows = isKeyRange || usesSecondaryIndexing
-                       ? StorageProxy.getRangeSlice(getRangeCommand(variables, limit, now), cl)
-                       : StorageProxy.read(getSliceCommands(variables, limit, now), cl);
+        Pageable command = isKeyRange || usesSecondaryIndexing
+                         ? getRangeCommand(variables, limit, now)
+                         : new Pageable.ReadCommands(getSliceCommands(variables, limit, now));
+
+        // A count query will never be paged for the user, but we always page it internally to avoid OOM.
+        // If we user provided a pageSize we'll use that to page internally (because why not), otherwise we use our default
+        if (parameters.isCount && pageSize < 0)
+            pageSize = DEFAULT_COUNT_PAGE_SIZE;
+
+        if (pageSize < 0 || !QueryPagers.mayNeedPaging(command, pageSize))
+        {
+            return execute(command, cl, variables, limit, now);
+        }
+        else
+        {
+            QueryPager pager = QueryPagers.pager(command, cl);
+            return parameters.isCount
+                 ? pageCountQuery(pager, variables, pageSize)
+                 : setupPaging(pager, state, variables, limit, pageSize);
+        }
+    }
+
+    private ResultMessage.Rows execute(Pageable command, ConsistencyLevel cl, List<ByteBuffer> variables, int limit, long now) throws RequestValidationException, RequestExecutionException
+    {
+        List<Row> rows = command instanceof Pageable.ReadCommands
+                       ? StorageProxy.read(((Pageable.ReadCommands)command).commands, cl)
+                       : StorageProxy.getRangeSlice((RangeSliceCommand)command, cl);
 
         return processResults(rows, variables, limit, now);
     }
 
-    private ResultMessage.Rows processResults(List<Row> rows, List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
+    // TODO: we could probably refactor processResults so it doesn't needs the variables, so we don't have to keep around. But that can wait.
+    private ResultMessage.Rows setupPaging(QueryPager pager, QueryState state, List<ByteBuffer> variables, int limit, int pageSize) throws RequestValidationException, RequestExecutionException
+    {
+        List<Row> page = pager.fetchPage(pageSize);
+
+        ResultMessage.Rows msg = processResults(page, variables, limit, pager.timestamp());
+
+        // Don't bother setting up the pager if we actually don't need to.
+        if (pager.isExhausted())
+            return msg;
+
+        state.attachPager(pager, this, variables);
+        msg.result.metadata.setHasMorePages();
+        return msg;
+    }
+
+    private ResultMessage.Rows pageCountQuery(QueryPager pager, List<ByteBuffer> variables, int pageSize) throws RequestValidationException, RequestExecutionException
+    {
+        int count = 0;
+        while (!pager.isExhausted())
+        {
+            int maxLimit = pager.maxRemaining();
+            ResultSet rset = process(pager.fetchPage(pageSize), variables, maxLimit, pager.timestamp());
+            count += rset.rows.size();
+        }
+
+        ResultSet result = ResultSet.makeCountResult(keyspace(), columnFamily(), count, parameters.countAlias);
+        return new ResultMessage.Rows(result);
+    }
+
+    public ResultMessage.Rows processResults(List<Row> rows, List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
     {
         // Even for count, we need to process the result as it'll group some column together in sparse column families
         ResultSet rset = process(rows, variables, limit, now);
@@ -169,7 +229,7 @@ public class SelectStatement implements CQLStatement
         int limit = getLimit(variables);
         long now = System.currentTimeMillis();
         List<Row> rows = isKeyRange || usesSecondaryIndexing
-                       ? RangeSliceVerbHandler.executeLocally(getRangeCommand(variables, limit, now))
+                       ? getRangeCommand(variables, limit, now).executeLocally()
                        : readLocally(keyspace(), getSliceCommands(variables, limit, now));
 
         return processResults(rows, variables, limit, now);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index 16445f5..a10415a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -54,7 +54,7 @@ public class TruncateStatement extends CFStatement implements CQLStatement
         ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
     }
 
-    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables) throws InvalidRequestException, TruncateException
+    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize) throws InvalidRequestException, TruncateException
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
index 0db80bf..4806314 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
@@ -51,7 +51,7 @@ public class UseStatement extends ParsedStatement implements CQLStatement
     {
     }
 
-    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables) throws InvalidRequestException
+    public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize) throws InvalidRequestException
     {
         state.getClientState().setKeyspace(keyspace);
         return new ResultMessage.SetKeyspace(keyspace);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractRangeCommand.java b/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
new file mode 100644
index 0000000..1258344
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.service.IReadCommand;
+import org.apache.cassandra.thrift.IndexExpression;
+
+public abstract class AbstractRangeCommand implements IReadCommand
+{
+    public final String keyspace;
+    public final String columnFamily;
+    public final long timestamp;
+
+    public final AbstractBounds<RowPosition> keyRange;
+    public final IDiskAtomFilter predicate;
+    public final List<IndexExpression> rowFilter;
+
+    public AbstractRangeCommand(String keyspace, String columnFamily, long timestamp, AbstractBounds<RowPosition> keyRange, IDiskAtomFilter predicate, List<IndexExpression> rowFilter)
+    {
+        this.keyspace = keyspace;
+        this.columnFamily = columnFamily;
+        this.timestamp = timestamp;
+        this.keyRange = keyRange;
+        this.predicate = predicate;
+        this.rowFilter = rowFilter;
+    }
+
+    public String getKeyspace()
+    {
+        return keyspace;
+    }
+
+    public abstract MessageOut<? extends AbstractRangeCommand> createMessage();
+    public abstract AbstractRangeCommand forSubRange(AbstractBounds<RowPosition> range);
+    public abstract AbstractRangeCommand withUpdatedLimit(int newLimit);
+
+    public abstract int limit();
+    public abstract boolean countCQL3Rows();
+    public abstract List<Row> executeLocally();
+
+    public long getTimeout()
+    {
+        return DatabaseDescriptor.getRangeRpcTimeout();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 735f627..62a1fdb 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -49,9 +49,11 @@ import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.*;
+import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.db.filter.ExtendedFilter;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -1304,9 +1306,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     ColumnFamily filterColumnFamily(ColumnFamily cached, QueryFilter filter)
     {
         ColumnFamily cf = cached.cloneMeShallow(ArrayBackedSortedColumns.factory, filter.filter.isReversed());
-        OnDiskAtomIterator ci = filter.getMemtableColumnIterator(cached, null);
-        filter.collateOnDiskAtom(cf, ci, gcBefore(filter.timestamp));
-        return removeDeletedCF(cf, gcBefore(filter.timestamp));
+        OnDiskAtomIterator ci = filter.getColumnFamilyIterator(cached);
+
+        int gcBefore = gcBefore(filter.timestamp);
+        filter.collateOnDiskAtom(cf, ci, gcBefore);
+        return removeDeletedCF(cf, gcBefore);
     }
 
     /**
@@ -1466,26 +1470,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     /**
       * Iterate over a range of rows and columns from memtables/sstables.
       *
-      * @param range Either a Bounds, which includes start key, or a Range, which does not.
-      * @param columnFilter description of the columns we're interested in for each row
+      * @param range The range of keys and columns within those keys to fetch
      */
-    private AbstractScanIterator getSequentialIterator(final AbstractBounds<RowPosition> range,
-                                                       IDiskAtomFilter columnFilter,
-                                                       long timestamp)
+    private AbstractScanIterator getSequentialIterator(final DataRange range, long now)
     {
-        assert !(range instanceof Range) || !((Range)range).isWrapAround() || range.right.isMinimum() : range;
-
-        final RowPosition startWith = range.left;
-        final RowPosition stopAt = range.right;
-
-        QueryFilter filter = new QueryFilter(null, name, columnFilter, timestamp);
+        assert !(range.keyRange() instanceof Range) || !((Range)range.keyRange()).isWrapAround() || range.keyRange().right.isMinimum() : range.keyRange();
 
-        final ViewFragment view = markReferenced(range);
-        Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), range.getString(metadata.getKeyValidator()));
+        final ViewFragment view = markReferenced(range.keyRange());
+        Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), range.keyRange().getString(metadata.getKeyValidator()));
 
         try
         {
-            final CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(view.memtables, view.sstables, startWith, stopAt, filter, this);
+            final CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(view.memtables, view.sstables, range, this, now);
 
             // todo this could be pushed into SSTableScanner
             return new AbstractScanIterator()
@@ -1499,7 +1495,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                     Row current = iterator.next();
                     DecoratedKey key = current.key;
 
-                    if (!stopAt.isMinimum() && stopAt.compareTo(key) < 0)
+                    if (!range.stopKey().isMinimum() && range.stopKey().compareTo(key) < 0)
                         return endOfData();
 
                     // skipping outside of assigned range
@@ -1527,43 +1523,108 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
+    @VisibleForTesting
     public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range,
                                    List<IndexExpression> rowFilter,
                                    IDiskAtomFilter columnFilter,
                                    int maxResults)
     {
-        return getRangeSlice(range, rowFilter, columnFilter, maxResults, System.currentTimeMillis(), false, false);
+        return getRangeSlice(range, rowFilter, columnFilter, maxResults, System.currentTimeMillis());
     }
 
     public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range,
                                    List<IndexExpression> rowFilter,
                                    IDiskAtomFilter columnFilter,
                                    int maxResults,
+                                   long now)
+    {
+        return getRangeSlice(makeExtendedFilter(range, columnFilter, rowFilter, maxResults, false, false, now));
+    }
+
+    /**
+     * Allows generic range paging with the slice column filter.
+     * Typically, suppose we have rows A, B, C ... Z having each some columns in [1, 100].
+     * And suppose we want to page throught the query that for all rows returns the columns
+     * within [25, 75]. For that, we need to be able to do a range slice starting at (row r, column c)
+     * and ending at (row Z, column 75), *but* that only return columns in [25, 75].
+     * That is what this method allows. The columnRange is the "window" of  columns we are interested
+     * in each row, and columnStart (resp. columnEnd) is the start (resp. end) for the first
+     * (resp. end) requested row.
+     */
+    public ExtendedFilter makeExtendedFilter(AbstractBounds<RowPosition> keyRange,
+                                             SliceQueryFilter columnRange,
+                                             ByteBuffer columnStart,
+                                             ByteBuffer columnStop,
+                                             List<IndexExpression> rowFilter,
+                                             int maxResults,
+                                             long now)
+    {
+        DataRange dataRange = new DataRange.Paging(keyRange, columnRange, columnStart, columnStop, metadata.comparator);
+        return ExtendedFilter.create(this, dataRange, rowFilter, maxResults, true, now);
+    }
+
+    public List<Row> getRangeSlice(AbstractBounds<RowPosition> range,
+                                   List<IndexExpression> rowFilter,
+                                   IDiskAtomFilter columnFilter,
+                                   int maxResults,
                                    long now,
                                    boolean countCQL3Rows,
                                    boolean isPaging)
     {
-        return filter(getSequentialIterator(range, columnFilter, now),
-                      ExtendedFilter.create(this, rowFilter, columnFilter, maxResults, now, countCQL3Rows, isPaging));
+        return getRangeSlice(makeExtendedFilter(range, columnFilter, rowFilter, maxResults, countCQL3Rows, isPaging, now));
     }
 
+    public ExtendedFilter makeExtendedFilter(AbstractBounds<RowPosition> range,
+                                             IDiskAtomFilter columnFilter,
+                                             List<IndexExpression> rowFilter,
+                                             int maxResults,
+                                             boolean countCQL3Rows,
+                                             boolean isPaging,
+                                             long timestamp)
+    {
+        DataRange dataRange;
+        if (isPaging)
+        {
+            assert columnFilter instanceof SliceQueryFilter;
+            SliceQueryFilter sfilter = (SliceQueryFilter)columnFilter;
+            assert sfilter.slices.length == 1;
+            SliceQueryFilter newFilter = new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, sfilter.isReversed(), sfilter.count);
+            dataRange = new DataRange.Paging(range, newFilter, sfilter.start(), sfilter.finish(), metadata.comparator);
+        }
+        else
+        {
+            dataRange = new DataRange(range, columnFilter);
+        }
+        return ExtendedFilter.create(this, dataRange, rowFilter, maxResults, countCQL3Rows, timestamp);
+    }
+
+    public List<Row> getRangeSlice(ExtendedFilter filter)
+    {
+        return filter(getSequentialIterator(filter.dataRange, filter.timestamp), filter);
+    }
+
+    @VisibleForTesting
     public List<Row> search(AbstractBounds<RowPosition> range,
                             List<IndexExpression> clause,
-                            IDiskAtomFilter columnFilter,
+                            IDiskAtomFilter dataFilter,
                             int maxResults)
     {
-        return search(range, clause, columnFilter, maxResults, System.currentTimeMillis(), false);
+        return search(range, clause, dataFilter, maxResults, System.currentTimeMillis());
     }
 
     public List<Row> search(AbstractBounds<RowPosition> range,
                             List<IndexExpression> clause,
-                            IDiskAtomFilter columnFilter,
+                            IDiskAtomFilter dataFilter,
                             int maxResults,
-                            long now,
-                            boolean countCQL3Rows)
+                            long now)
+    {
+        return search(makeExtendedFilter(range, dataFilter, clause, maxResults, false, false, now));
+    }
+
+    public List<Row> search(ExtendedFilter filter)
     {
-        Tracing.trace("Executing indexed scan for {}", range.getString(metadata.getKeyValidator()));
-        return indexManager.search(range, clause, columnFilter, maxResults, now, countCQL3Rows);
+        Tracing.trace("Executing indexed scan for {}", filter.dataRange.keyRange().getString(metadata.getKeyValidator()));
+        return indexManager.search(filter);
     }
 
     public List<Row> filter(AbstractScanIterator rowIterator, ExtendedFilter filter)
@@ -1584,7 +1645,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
                 if (rowIterator.needsFiltering())
                 {
-                    IDiskAtomFilter extraFilter = filter.getExtraFilter(data);
+                    IDiskAtomFilter extraFilter = filter.getExtraFilter(rawRow.key, data);
                     if (extraFilter != null)
                     {
                         ColumnFamily cf = filter.cfs.getColumnFamily(new QueryFilter(rawRow.key, name, extraFilter, filter.timestamp));
@@ -1594,12 +1655,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
                     removeDroppedColumns(data);
 
-                    if (!filter.isSatisfiedBy(rawRow.key.key, data, null))
+                    if (!filter.isSatisfiedBy(rawRow.key, data, null))
                         continue;
 
                     logger.trace("{} satisfies all filter expressions", data);
                     // cut the resultset back to what was requested, if necessary
-                    data = filter.prune(data);
+                    data = filter.prune(rawRow.key, data);
                 }
                 else
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java
new file mode 100644
index 0000000..d764d60
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.dht.*;
+
+/**
+ * Groups key range and column filter for range queries.
+ *
+ * The main "trick" of this class is that the column filter can only
+ * be obtained by providing the row key on which the column filter will
+ * be applied (which we always know before actually querying the columns).
+ *
+ * This allows the paging DataRange to return a filter for most rows but a
+ * potentially different ones for the starting and stopping key. Could
+ * allow more fancy stuff in the future too, like column filters that
+ * depend on the actual key value :)
+ */
+public class DataRange
+{
+    private final AbstractBounds<RowPosition> keyRange;
+    protected IDiskAtomFilter columnFilter;
+    protected final boolean selectFullRow;
+
+    public DataRange(AbstractBounds<RowPosition> range, IDiskAtomFilter columnFilter)
+    {
+        assert !(range instanceof Range) || !((Range)range).isWrapAround() || range.right.isMinimum() : range;
+
+        this.keyRange = range;
+        this.columnFilter = columnFilter;
+        this.selectFullRow = columnFilter instanceof SliceQueryFilter
+                           ? isFullRowSlice((SliceQueryFilter)columnFilter)
+                           : false;
+    }
+
+    public static boolean isFullRowSlice(SliceQueryFilter filter)
+    {
+        return filter.slices.length == 1
+            && filter.start().remaining() == 0
+            && filter.finish().remaining() == 0
+            && filter.count == Integer.MAX_VALUE;
+    }
+
+    public static DataRange allData(IPartitioner partitioner)
+    {
+        return forKeyRange(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken()));
+    }
+
+    public static DataRange forKeyRange(Range<Token> keyRange)
+    {
+        return new DataRange(keyRange.toRowBounds(), new IdentityQueryFilter());
+    }
+
+    public AbstractBounds<RowPosition> keyRange()
+    {
+        return keyRange;
+    }
+
+    public RowPosition startKey()
+    {
+        return keyRange.left;
+    }
+
+    public RowPosition stopKey()
+    {
+        return keyRange.right;
+    }
+
+    public boolean contains(RowPosition pos)
+    {
+        return keyRange.contains(pos);
+    }
+
+    public int getLiveCount(ColumnFamily data, long now)
+    {
+        return columnFilter instanceof SliceQueryFilter
+             ? ((SliceQueryFilter)columnFilter).lastCounted()
+             : columnFilter.getLiveCount(data, now);
+    }
+
+    public boolean selectsFullRowFor(ByteBuffer rowKey)
+    {
+        return selectFullRow;
+    }
+
+    public IDiskAtomFilter columnFilter(ByteBuffer rowKey)
+    {
+        return columnFilter;
+    }
+
+    public void updateColumnsLimit(int count)
+    {
+        columnFilter.updateColumnsLimit(count);
+    }
+
+    public static class Paging extends DataRange
+    {
+        private final SliceQueryFilter sliceFilter;
+        private final Comparator<ByteBuffer> comparator;
+        private final ByteBuffer columnStart;
+        private final ByteBuffer columnFinish;
+
+        private Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer columnStart, ByteBuffer columnFinish, Comparator<ByteBuffer> comparator)
+        {
+            super(range, filter);
+
+            this.sliceFilter = filter;
+            this.comparator = comparator;
+            this.columnStart = columnStart;
+            this.columnFinish = columnFinish;
+        }
+
+        public Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer columnStart, ByteBuffer columnFinish, AbstractType<?> comparator)
+        {
+            this(range, filter, columnStart, columnFinish, filter.isReversed() ? comparator.reverseComparator : comparator);
+        }
+
+        @Override
+        public boolean selectsFullRowFor(ByteBuffer rowKey)
+        {
+            // If we initial filter is not the full filter, don't bother
+            if (!selectFullRow)
+                return false;
+
+            if (!equals(startKey(), rowKey) && !equals(stopKey(), rowKey))
+                return selectFullRow;
+
+            return isFullRowSlice((SliceQueryFilter)columnFilter(rowKey));
+        }
+
+        private boolean equals(RowPosition pos, ByteBuffer rowKey)
+        {
+            return pos instanceof DecoratedKey && ((DecoratedKey)pos).key.equals(rowKey);
+        }
+
+        @Override
+        public IDiskAtomFilter columnFilter(ByteBuffer rowKey)
+        {
+            /*
+             * We have that ugly hack that for slice queries, when we ask for
+             * the live count, we reach into the query filter to get the last
+             * counter number of columns to avoid recounting.
+             * Maybe we should just remove that hack, but in the meantime, we
+             * need to keep a reference the last returned filter.
+             */
+            columnFilter = equals(startKey(), rowKey) || equals(stopKey(), rowKey)
+                         ? sliceFilter.withUpdatedSlices(slicesForKey(rowKey))
+                         : sliceFilter;
+            return columnFilter;
+        }
+
+        private ColumnSlice[] slicesForKey(ByteBuffer key)
+        {
+            // We don't call that until it's necessary, so assume we have to do some hard work
+            ByteBuffer newStart = equals(startKey(), key) ? columnStart : null;
+            ByteBuffer newFinish = equals(stopKey(), key) ? columnFinish : null;
+
+            List<ColumnSlice> newSlices = new ArrayList<ColumnSlice>(sliceFilter.slices.length); // in the common case, we'll have the same number of slices
+
+            for (ColumnSlice slice : sliceFilter.slices)
+            {
+                if (newStart != null)
+                {
+                    if (slice.isBefore(comparator, newStart))
+                        continue; // we skip that slice
+
+                    if (slice.includes(comparator, newStart))
+                        slice = new ColumnSlice(newStart, slice.finish);
+
+                    // Whether we've updated the slice or not, we don't have to bother about newStart anymore
+                    newStart = null;
+                }
+
+                assert newStart == null;
+                if (newFinish != null && !slice.isBefore(comparator, newFinish))
+                {
+                    if (slice.includes(comparator, newFinish))
+                        newSlices.add(new ColumnSlice(slice.start, newFinish));
+                    // In any case, we're done
+                    break;
+                }
+                newSlices.add(slice);
+            }
+
+            return newSlices.toArray(new ColumnSlice[newSlices.size()]);
+        }
+
+        @Override
+        public void updateColumnsLimit(int count)
+        {
+            columnFilter.updateColumnsLimit(count);
+            sliceFilter.updateColumnsLimit(count);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index e89c769..3a30701 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -480,7 +480,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         RowPosition minPos = p.getMinimumToken().minKeyBound();
         Range<RowPosition> range = new Range<RowPosition>(minPos, minPos, p);
         IDiskAtomFilter filter = new NamesQueryFilter(ImmutableSortedSet.<ByteBuffer>of());
-        List<Row> rows = hintStore.getRangeSlice(range, null, filter, Integer.MAX_VALUE);
+        List<Row> rows = hintStore.getRangeSlice(range, null, filter, Integer.MAX_VALUE, System.currentTimeMillis());
         for (Row row : rows)
         {
             UUID hostId = UUIDGen.getUUID(row.key.key);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index ad6258a..c31c882 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Function;
 import com.google.common.base.Throwables;
-import com.google.common.collect.AbstractIterator;
 
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.StageManager;
@@ -336,52 +335,6 @@ public class Memtable
         return period > 0 && (System.nanoTime() - creationNano >= TimeUnit.MILLISECONDS.toNanos(period));
     }
 
-    /**
-     * obtain an iterator of columns in this memtable in the specified order starting from a given column.
-     */
-    public static OnDiskAtomIterator getSliceIterator(final DecoratedKey key, final ColumnFamily cf, SliceQueryFilter filter)
-    {
-        assert cf != null;
-        final Iterator<Column> filteredIter = filter.reversed ? cf.reverseIterator(filter.slices) : cf.iterator(filter.slices);
-
-        return new OnDiskAtomIterator()
-        {
-            public ColumnFamily getColumnFamily()
-            {
-                return cf;
-            }
-
-            public DecoratedKey getKey()
-            {
-                return key;
-            }
-
-            public boolean hasNext()
-            {
-                return filteredIter.hasNext();
-            }
-
-            public OnDiskAtom next()
-            {
-                return filteredIter.next();
-            }
-
-            public void close() throws IOException { }
-
-            public void remove()
-            {
-                throw new UnsupportedOperationException();
-            }
-        };
-    }
-
-    public static OnDiskAtomIterator getNamesIterator(final DecoratedKey key, final ColumnFamily cf, final NamesQueryFilter filter)
-    {
-        assert cf != null;
-
-        return new ByNameColumnIterator(filter.columns.iterator(), cf, key);
-    }
-
     public ColumnFamily getColumnFamily(DecoratedKey key)
     {
         return rows.get(key);
@@ -392,44 +345,6 @@ public class Memtable
         return creationTime;
     }
 
-    private static class ByNameColumnIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
-    {
-        private final ColumnFamily cf;
-        private final DecoratedKey key;
-        private final Iterator<ByteBuffer> iter;
-
-        public ByNameColumnIterator(Iterator<ByteBuffer> iter, ColumnFamily cf, DecoratedKey key)
-        {
-            this.iter = iter;
-            this.cf = cf;
-            this.key = key;
-        }
-
-        public ColumnFamily getColumnFamily()
-        {
-            return cf;
-        }
-
-        public DecoratedKey getKey()
-        {
-            return key;
-        }
-
-        protected OnDiskAtom computeNext()
-        {
-            while (iter.hasNext())
-            {
-                ByteBuffer current = iter.next();
-                Column column = cf.getColumn(current);
-                if (column != null)
-                    return column;
-            }
-            return endOfData();
-        }
-
-        public void close() throws IOException { }
-    }
-
     class FlushRunnable extends DiskAwareRunnable
     {
         private final CountDownLatch latch;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/PagedRangeCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PagedRangeCommand.java b/src/java/org/apache/cassandra/db/PagedRangeCommand.java
new file mode 100644
index 0000000..0e1fa4f
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/PagedRangeCommand.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.thrift.IndexExpression;
+import org.apache.cassandra.thrift.IndexOperator;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class PagedRangeCommand extends AbstractRangeCommand
+{
+    public static final IVersionedSerializer<PagedRangeCommand> serializer = new Serializer();
+
+    public final ByteBuffer start;
+    public final ByteBuffer stop;
+    public final int limit;
+
+    public PagedRangeCommand(String keyspace,
+                             String columnFamily,
+                             long timestamp,
+                             AbstractBounds<RowPosition> keyRange,
+                             SliceQueryFilter predicate,
+                             ByteBuffer start,
+                             ByteBuffer stop,
+                             List<IndexExpression> rowFilter,
+                             int limit)
+    {
+        super(keyspace, columnFamily, timestamp, keyRange, predicate, rowFilter);
+        this.start = start;
+        this.stop = stop;
+        this.limit = limit;
+    }
+
+    public MessageOut<PagedRangeCommand> createMessage()
+    {
+        return new MessageOut<PagedRangeCommand>(MessagingService.Verb.PAGED_RANGE, this, serializer);
+    }
+
+    public AbstractRangeCommand forSubRange(AbstractBounds<RowPosition> subRange)
+    {
+        ByteBuffer newStart = subRange.left.equals(keyRange.left) ? start : ((SliceQueryFilter)predicate).start();
+        ByteBuffer newStop = subRange.right.equals(keyRange.right) ? stop : ((SliceQueryFilter)predicate).finish();
+        return new PagedRangeCommand(keyspace,
+                                     columnFamily,
+                                     timestamp,
+                                     subRange,
+                                     (SliceQueryFilter)predicate,
+                                     newStart,
+                                     newStop,
+                                     rowFilter,
+                                     limit);
+    }
+
+    public AbstractRangeCommand withUpdatedLimit(int newLimit)
+    {
+        return new PagedRangeCommand(keyspace,
+                                     columnFamily,
+                                     timestamp,
+                                     keyRange,
+                                     (SliceQueryFilter)predicate,
+                                     start,
+                                     stop,
+                                     rowFilter,
+                                     newLimit);
+    }
+
+    public int limit()
+    {
+        return limit;
+    }
+
+    public boolean countCQL3Rows()
+    {
+        return true;
+    }
+
+    public List<Row> executeLocally()
+    {
+        ColumnFamilyStore cfs = Table.open(keyspace).getColumnFamilyStore(columnFamily);
+
+        ExtendedFilter exFilter = cfs.makeExtendedFilter(keyRange, (SliceQueryFilter)predicate, start, stop, rowFilter, limit, timestamp);
+        if (cfs.indexManager.hasIndexFor(rowFilter))
+            return cfs.search(exFilter);
+        else
+            return cfs.getRangeSlice(exFilter);
+    }
+
+    private static class Serializer implements IVersionedSerializer<PagedRangeCommand>
+    {
+        public void serialize(PagedRangeCommand cmd, DataOutput out, int version) throws IOException
+        {
+            out.writeUTF(cmd.keyspace);
+            out.writeUTF(cmd.columnFamily);
+            out.writeLong(cmd.timestamp);
+
+            AbstractBounds.serializer.serialize(cmd.keyRange, out, version);
+
+            // SliceQueryFilter (the count is not used)
+            SliceQueryFilter filter = (SliceQueryFilter)cmd.predicate;
+            SliceQueryFilter.serializer.serialize(filter, out, version);
+
+            // The start and stop of the page
+            ByteBufferUtil.writeWithShortLength(cmd.start, out);
+            ByteBufferUtil.writeWithShortLength(cmd.stop, out);
+
+            out.writeInt(cmd.rowFilter.size());
+            for (IndexExpression expr : cmd.rowFilter)
+            {
+                ByteBufferUtil.writeWithShortLength(expr.column_name, out);
+                out.writeInt(expr.op.getValue());
+                ByteBufferUtil.writeWithLength(expr.value, out);
+            }
+
+            out.writeInt(cmd.limit);
+        }
+
+        public PagedRangeCommand deserialize(DataInput in, int version) throws IOException
+        {
+            String keyspace = in.readUTF();
+            String columnFamily = in.readUTF();
+            long timestamp = in.readLong();
+
+            AbstractBounds<RowPosition> keyRange = AbstractBounds.serializer.deserialize(in, version).toRowBounds();
+
+            SliceQueryFilter predicate = SliceQueryFilter.serializer.deserialize(in, version);
+
+            ByteBuffer start = ByteBufferUtil.readWithShortLength(in);
+            ByteBuffer stop = ByteBufferUtil.readWithShortLength(in);
+
+            int filterCount = in.readInt();
+            List<IndexExpression> rowFilter = new ArrayList<IndexExpression>(filterCount);
+            for (int i = 0; i < filterCount; i++)
+            {
+                IndexExpression expr = new IndexExpression(ByteBufferUtil.readWithShortLength(in),
+                                                           IndexOperator.findByValue(in.readInt()),
+                                                           ByteBufferUtil.readWithShortLength(in));
+                rowFilter.add(expr);
+            }
+
+            int limit = in.readInt();
+            return new PagedRangeCommand(keyspace, columnFamily, timestamp, keyRange, predicate, start, stop, rowFilter, limit);
+        }
+
+        public long serializedSize(PagedRangeCommand cmd, int version)
+        {
+            long size = 0;
+
+            size += TypeSizes.NATIVE.sizeof(cmd.keyspace);
+            size += TypeSizes.NATIVE.sizeof(cmd.columnFamily);
+            size += TypeSizes.NATIVE.sizeof(cmd.timestamp);
+
+            size += AbstractBounds.serializer.serializedSize(cmd.keyRange, version);
+
+            size += SliceQueryFilter.serializer.serializedSize((SliceQueryFilter)cmd.predicate, version);
+
+            size += TypeSizes.NATIVE.sizeof(cmd.rowFilter.size());
+            for (IndexExpression expr : cmd.rowFilter)
+            {
+                size += TypeSizes.NATIVE.sizeofWithShortLength(expr.column_name);
+                size += TypeSizes.NATIVE.sizeof(expr.op.getValue());
+                size += TypeSizes.NATIVE.sizeofWithLength(expr.value);
+            }
+
+            size += TypeSizes.NATIVE.sizeof(cmd.limit);
+            return size;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index c7e71f3..c037518 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -23,10 +23,11 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.filter.ExtendedFilter;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CompositeType;
@@ -34,25 +35,15 @@ import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.IReadCommand;
+import org.apache.cassandra.service.pager.Pageable;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-public class RangeSliceCommand implements IReadCommand
+public class RangeSliceCommand extends AbstractRangeCommand implements Pageable
 {
     public static final RangeSliceCommandSerializer serializer = new RangeSliceCommandSerializer();
 
-    public final String keyspace;
-
-    public final String column_family;
-
-    public final long timestamp;
-
-    public final IDiskAtomFilter predicate;
-    public final List<IndexExpression> row_filter;
-
-    public final AbstractBounds<RowPosition> range;
     public final int maxResults;
     public final boolean countCQL3Rows;
     public final boolean isPaging;
@@ -88,12 +79,7 @@ public class RangeSliceCommand implements IReadCommand
                              boolean countCQL3Rows,
                              boolean isPaging)
     {
-        this.keyspace = keyspace;
-        this.column_family = column_family;
-        this.timestamp = timestamp;
-        this.predicate = predicate;
-        this.range = range;
-        this.row_filter = row_filter;
+        super(keyspace, column_family, timestamp, range, predicate, row_filter);
         this.maxResults = maxResults;
         this.countCQL3Rows = countCQL3Rows;
         this.isPaging = isPaging;
@@ -104,29 +90,66 @@ public class RangeSliceCommand implements IReadCommand
         return new MessageOut<RangeSliceCommand>(MessagingService.Verb.RANGE_SLICE, this, serializer);
     }
 
+    public AbstractRangeCommand forSubRange(AbstractBounds<RowPosition> subRange)
+    {
+        return new RangeSliceCommand(keyspace,
+                                     columnFamily,
+                                     timestamp,
+                                     predicate,
+                                     subRange,
+                                     rowFilter,
+                                     maxResults,
+                                     countCQL3Rows,
+                                     isPaging);
+    }
+
+    public AbstractRangeCommand withUpdatedLimit(int newLimit)
+    {
+        return new RangeSliceCommand(keyspace,
+                                     columnFamily,
+                                     timestamp,
+                                     predicate,
+                                     keyRange,
+                                     rowFilter,
+                                     newLimit,
+                                     countCQL3Rows,
+                                     isPaging);
+    }
+
+    public int limit()
+    {
+        return maxResults;
+    }
+
+    public boolean countCQL3Rows()
+    {
+        return countCQL3Rows;
+    }
+
+    public List<Row> executeLocally()
+    {
+        ColumnFamilyStore cfs = Table.open(keyspace).getColumnFamilyStore(columnFamily);
+
+        ExtendedFilter exFilter = cfs.makeExtendedFilter(keyRange, predicate, rowFilter, maxResults, countCQL3Rows, isPaging, timestamp);
+        if (cfs.indexManager.hasIndexFor(rowFilter))
+            return cfs.search(exFilter);
+        else
+            return cfs.getRangeSlice(exFilter);
+    }
+
     @Override
     public String toString()
     {
         return "RangeSliceCommand{" +
                "keyspace='" + keyspace + '\'' +
-               ", column_family='" + column_family + '\'' +
-               ", timestamp='" + timestamp + '\'' +
+               ", columnFamily='" + columnFamily + '\'' +
+               ", timestamp=" + timestamp +
                ", predicate=" + predicate +
-               ", range=" + range +
-               ", row_filter =" + row_filter +
+               ", range=" + keyRange +
+               ", rowFilter =" + rowFilter +
                ", maxResults=" + maxResults +
                ", countCQL3Rows=" + countCQL3Rows +
-               '}';
-    }
-
-    public String getKeyspace()
-    {
-        return keyspace;
-    }
-
-    public long getTimeout()
-    {
-        return DatabaseDescriptor.getRangeRpcTimeout();
+               "}";
     }
 }
 
@@ -135,7 +158,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
     public void serialize(RangeSliceCommand sliceCommand, DataOutput out, int version) throws IOException
     {
         out.writeUTF(sliceCommand.keyspace);
-        out.writeUTF(sliceCommand.column_family);
+        out.writeUTF(sliceCommand.columnFamily);
 
         if (version >= MessagingService.VERSION_20)
             out.writeLong(sliceCommand.timestamp);
@@ -147,7 +170,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
             // must extract the super column name from the predicate (and
             // modify the predicate accordingly)
             ByteBuffer sc = null;
-            CFMetaData metadata = Schema.instance.getCFMetaData(sliceCommand.getKeyspace(), sliceCommand.column_family);
+            CFMetaData metadata = Schema.instance.getCFMetaData(sliceCommand.getKeyspace(), sliceCommand.columnFamily);
             if (metadata.cfType == ColumnFamilyType.Super)
             {
                 SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, filter);
@@ -162,21 +185,21 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
 
         IDiskAtomFilter.Serializer.instance.serialize(filter, out, version);
 
-        if (sliceCommand.row_filter == null)
+        if (sliceCommand.rowFilter == null)
         {
             out.writeInt(0);
         }
         else
         {
-            out.writeInt(sliceCommand.row_filter.size());
-            for (IndexExpression expr : sliceCommand.row_filter)
+            out.writeInt(sliceCommand.rowFilter.size());
+            for (IndexExpression expr : sliceCommand.rowFilter)
             {
                 ByteBufferUtil.writeWithShortLength(expr.column_name, out);
                 out.writeInt(expr.op.getValue());
                 ByteBufferUtil.writeWithShortLength(expr.value, out);
             }
         }
-        AbstractBounds.serializer.serialize(sliceCommand.range, out, version);
+        AbstractBounds.serializer.serialize(sliceCommand.keyRange, out, version);
         out.writeInt(sliceCommand.maxResults);
         out.writeBoolean(sliceCommand.countCQL3Rows);
         out.writeBoolean(sliceCommand.isPaging);
@@ -246,7 +269,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
     public long serializedSize(RangeSliceCommand rsc, int version)
     {
         long size = TypeSizes.NATIVE.sizeof(rsc.keyspace);
-        size += TypeSizes.NATIVE.sizeof(rsc.column_family);
+        size += TypeSizes.NATIVE.sizeof(rsc.columnFamily);
 
         if (version >= MessagingService.VERSION_20)
             size += TypeSizes.NATIVE.sizeof(rsc.timestamp);
@@ -255,7 +278,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
         if (version < MessagingService.VERSION_20)
         {
             ByteBuffer sc = null;
-            CFMetaData metadata = Schema.instance.getCFMetaData(rsc.keyspace, rsc.column_family);
+            CFMetaData metadata = Schema.instance.getCFMetaData(rsc.keyspace, rsc.columnFamily);
             if (metadata.cfType == ColumnFamilyType.Super)
             {
                 SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, filter);
@@ -276,21 +299,21 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
 
         size += IDiskAtomFilter.Serializer.instance.serializedSize(filter, version);
 
-        if (rsc.row_filter == null)
+        if (rsc.rowFilter == null)
         {
             size += TypeSizes.NATIVE.sizeof(0);
         }
         else
         {
-            size += TypeSizes.NATIVE.sizeof(rsc.row_filter.size());
-            for (IndexExpression expr : rsc.row_filter)
+            size += TypeSizes.NATIVE.sizeof(rsc.rowFilter.size());
+            for (IndexExpression expr : rsc.rowFilter)
             {
                 size += TypeSizes.NATIVE.sizeofWithShortLength(expr.column_name);
                 size += TypeSizes.NATIVE.sizeof(expr.op.getValue());
                 size += TypeSizes.NATIVE.sizeofWithLength(expr.value);
             }
         }
-        size += AbstractBounds.serializer.serializedSize(rsc.range, version);
+        size += AbstractBounds.serializer.serializedSize(rsc.keyRange, version);
         size += TypeSizes.NATIVE.sizeof(rsc.maxResults);
         size += TypeSizes.NATIVE.sizeof(rsc.countCQL3Rows);
         size += TypeSizes.NATIVE.sizeof(rsc.isPaging);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 61a2478..3031da8 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -34,8 +34,9 @@ import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.IReadCommand;
 import org.apache.cassandra.service.RowDataResolver;
+import org.apache.cassandra.service.pager.Pageable;
 
-public abstract class ReadCommand implements IReadCommand
+public abstract class ReadCommand implements IReadCommand, Pageable
 {
     public enum Type
     {