You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/06/25 10:32:40 UTC

[2/4] Add auto paging capability to the native protocol

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
new file mode 100644
index 0000000..460bc44
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Iterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnCounter;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+
+abstract class AbstractQueryPager implements QueryPager
+{
+    private final ConsistencyLevel consistencyLevel;
+    private final boolean localQuery;
+
+    protected final CFMetaData cfm;
+    protected final IDiskAtomFilter columnFilter;
+    private final long timestamp;
+
+    private volatile int remaining;
+    private volatile boolean exhausted;
+    private volatile boolean lastWasRecorded;
+
+    protected AbstractQueryPager(ConsistencyLevel consistencyLevel,
+                                 int toFetch,
+                                 boolean localQuery,
+                                 String keyspace,
+                                 String columnFamily,
+                                 IDiskAtomFilter columnFilter,
+                                 long timestamp)
+    {
+        this.consistencyLevel = consistencyLevel;
+        this.localQuery = localQuery;
+
+        this.cfm = Schema.instance.getCFMetaData(keyspace, columnFamily);
+        this.columnFilter = columnFilter;
+        this.timestamp = timestamp;
+
+        this.remaining = toFetch;
+    }
+
+    public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
+    {
+        if (isExhausted())
+            return Collections.emptyList();
+
+        int currentPageSize = nextPageSize(pageSize);
+        List<Row> rows = filterEmpty(queryNextPage(currentPageSize, consistencyLevel, localQuery));
+
+        if (rows.isEmpty())
+        {
+            exhausted = true;
+            return Collections.emptyList();
+        }
+
+        int liveCount = getPageLiveCount(rows);
+        remaining -= liveCount;
+
+        // If we've got less than requested, there is no more query to do (but
+        // we still need to return the current page)
+        if (liveCount < currentPageSize)
+            exhausted = true;
+
+        // If it's not the first query and the first column is the last one returned (likely
+        // but not certain since paging can race with deletes/expiration), then remove the
+        // first column.
+        if (containsPreviousLast(rows.get(0)))
+        {
+            rows = discardFirst(rows);
+            remaining++;
+        }
+        // Otherwise, if 'lastWasRecorded', we queried for one more than the page size,
+        // so if the page was is full, trim the last entry
+        else if (lastWasRecorded && !exhausted)
+        {
+            // We've asked for one more than necessary
+            rows = discardLast(rows);
+            remaining++;
+        }
+
+        if (!isExhausted())
+            lastWasRecorded = recordLast(rows.get(rows.size() - 1));
+
+        return rows;
+    }
+
+    private List<Row> filterEmpty(List<Row> result)
+    {
+        boolean doCopy = false;
+        for (Row row : result)
+        {
+            if (row.cf == null || row.cf.getColumnCount() == 0)
+            {
+                List<Row> newResult = new ArrayList<Row>(result.size() - 1);
+                for (Row row2 : result)
+                {
+                    if (row.cf == null || row.cf.getColumnCount() == 0)
+                        continue;
+
+                    newResult.add(row2);
+                }
+                return newResult;
+            }
+        }
+        return result;
+    }
+
+    public boolean isExhausted()
+    {
+        return exhausted || remaining == 0;
+    }
+
+    public int maxRemaining()
+    {
+        return remaining;
+    }
+
+    public long timestamp()
+    {
+        return timestamp;
+    }
+
+    private int nextPageSize(int pageSize)
+    {
+        return Math.min(remaining, pageSize) + (lastWasRecorded ? 1 : 0);
+    }
+
+    public ColumnCounter columnCounter()
+    {
+        return columnFilter.columnCounter(cfm.comparator, timestamp);
+    }
+
+    protected abstract List<Row> queryNextPage(int pageSize, ConsistencyLevel consistency, boolean localQuery) throws RequestValidationException, RequestExecutionException;
+    protected abstract boolean containsPreviousLast(Row first);
+    protected abstract boolean recordLast(Row last);
+
+    private List<Row> discardFirst(List<Row> rows)
+    {
+        Row first = rows.get(0);
+        ColumnFamily newCf = discardFirst(first.cf);
+
+        int count = newCf.getColumnCount();
+        List<Row> newRows = new ArrayList<Row>(count == 0 ? rows.size() - 1 : rows.size());
+        if (count != 0)
+            newRows.add(new Row(first.key, newCf));
+        newRows.addAll(rows.subList(1, rows.size()));
+
+        return newRows;
+    }
+
+    private List<Row> discardLast(List<Row> rows)
+    {
+        Row last = rows.get(rows.size() - 1);
+        ColumnFamily newCf = discardLast(last.cf);
+
+        int count = newCf.getColumnCount();
+        List<Row> newRows = new ArrayList<Row>(count == 0 ? rows.size() - 1 : rows.size());
+        newRows.addAll(rows.subList(0, rows.size() - 1));
+        if (count != 0)
+            newRows.add(new Row(last.key, newCf));
+
+        return newRows;
+    }
+
+    private int getPageLiveCount(List<Row> page)
+    {
+        int count = 0;
+        for (Row row : page)
+            count += columnCounter().countAll(row.cf).live();
+        return count;
+    }
+
+    private ColumnFamily discardFirst(ColumnFamily cf)
+    {
+        ColumnFamily copy = cf.cloneMeShallow();
+        ColumnCounter counter = columnCounter();
+
+        Iterator<Column> iter = cf.iterator();
+        // Discard the first live
+        while (iter.hasNext())
+        {
+            Column c = iter.next();
+            counter.count(c, cf);
+            if (counter.live() > 1)
+            {
+                copy.addColumn(c);
+                while (iter.hasNext())
+                    copy.addColumn(iter.next());
+            }
+        }
+        return copy;
+    }
+
+    private ColumnFamily discardLast(ColumnFamily cf)
+    {
+        ColumnFamily copy = cf.cloneMeShallow();
+        // Redoing the counting like that is not extremely efficient, but
+        // discardLast is only called in case of a race between paging and
+        // a deletion, which is pretty unlikely, so probably not a big deal
+        int liveCount = columnCounter().countAll(cf).live();
+
+        ColumnCounter counter = columnCounter();
+        // Discard the first live
+        for (Column c : cf)
+        {
+            counter.count(c, cf);
+            if (counter.live() < liveCount)
+                copy.addColumn(c);
+        }
+        return copy;
+    }
+
+    protected static ByteBuffer firstName(ColumnFamily cf)
+    {
+        return cf.iterator().next().name();
+    }
+
+    protected static ByteBuffer lastName(ColumnFamily cf)
+    {
+        return cf.getReverseSortedColumns().iterator().next().name();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
new file mode 100644
index 0000000..ef82535
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnCounter;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+
+/**
+ * Pager over a list of ReadCommand.
+ *
+ * Note that this is not easy to make efficient. Indeed, we need to page the first command fully before
+ * returning results from the next one, but if the result returned by each command is small (compared to pageSize),
+ * paging the commands one at a time under-performs compared to parallelizing. On the other, if we parallelize
+ * and each command raised pageSize results, we'll end up with commands.size() * pageSize results in memory, which
+ * defeats the purpose of paging.
+ *
+ * For now, we keep it simple (somewhat) and just do one command at a time. Provided that we make sure to not
+ * create a pager unless we need to, this is probably fine. Though if we later want to get fancy, we could use the
+ * cfs meanRowSize to decide if parallelizing some of the command might be worth it while being confident we don't
+ * blow out memory.
+ */
+class MultiPartitionPager implements QueryPager
+{
+    private final SinglePartitionPager[] pagers;
+    private final long timestamp;
+
+    private volatile int current;
+
+    MultiPartitionPager(List<ReadCommand> commands, final ConsistencyLevel consistencyLevel, final boolean localQuery)
+    {
+        this.pagers = new SinglePartitionPager[commands.size()];
+
+        long tstamp = -1;
+        for (int i = 0; i < commands.size(); i++)
+        {
+            ReadCommand command = commands.get(i);
+            if (tstamp == -1)
+                tstamp = command.timestamp;
+            else if (tstamp != command.timestamp)
+                throw new IllegalArgumentException("All commands must have the same timestamp or weird results may happen.");
+
+            pagers[i] = command instanceof SliceFromReadCommand
+                      ? new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, localQuery)
+                      : new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, localQuery);
+        }
+        timestamp = tstamp;
+    }
+
+    public boolean isExhausted()
+    {
+        while (current < pagers.length)
+        {
+            if (!pagers[current].isExhausted())
+                return false;
+
+            current++;
+        }
+        return true;
+    }
+
+    public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
+    {
+        int remaining = pageSize;
+        List<Row> result = new ArrayList<Row>();
+
+        while (!isExhausted() && remaining > 0)
+        {
+            // Exhausted also sets us on the first non-exhausted pager
+            List<Row> page = pagers[current].fetchPage(remaining);
+            if (page.isEmpty())
+                continue;
+
+            Row row = page.get(0);
+            remaining -= pagers[current].columnCounter().countAll(row.cf).live();
+            result.add(row);
+        }
+
+        return result;
+    }
+
+    public int maxRemaining()
+    {
+        int max = 0;
+        for (int i = current; i < pagers.length; i++)
+            max += pagers[i].maxRemaining();
+        return max;
+    }
+
+    public long timestamp()
+    {
+        return timestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java b/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
new file mode 100644
index 0000000..82e7376
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnCounter;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.StorageProxy;
+
+/**
+ * Pager over a SliceByNamesReadCommand.
+ */
+public class NamesQueryPager implements SinglePartitionPager
+{
+    private final SliceByNamesReadCommand command;
+    private final ConsistencyLevel consistencyLevel;
+    private final boolean localQuery;
+
+    private volatile boolean queried;
+
+    /**
+     * For now, we'll only use this in CQL3. In there, as name query can never
+     * yield more than one CQL3 row, there is no need for paging and so this is straight-forward.
+     *
+     * For thrift, we could imagine needing to page, though even then it's very
+     * unlikely unless the pageSize is very small.
+     *
+     * In any case we currently assert in fetchPage if it's a "thrift" query (i.e. a query that
+     * count every cell individually) and the names filter asks for more than pageSize columns.
+     */
+    // Don't use directly, use QueryPagers method instead
+    NamesQueryPager(SliceByNamesReadCommand command, ConsistencyLevel consistencyLevel, boolean localQuery)
+    {
+        this.command = command;
+        this.consistencyLevel = consistencyLevel;
+        this.localQuery = localQuery;
+    }
+
+    public ColumnCounter columnCounter()
+    {
+        // We know NamesQueryFilter.columnCounter don't care about his argument
+        return command.filter.columnCounter(null, command.timestamp);
+    }
+
+    public boolean isExhausted()
+    {
+        return queried;
+    }
+
+    public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
+    {
+        assert command.filter.countCQL3Rows() || command.filter.columns.size() <= pageSize;
+
+        if (isExhausted())
+            return Collections.<Row>emptyList();
+
+        queried = true;
+        return localQuery
+             ? Collections.singletonList(command.getRow(Table.open(command.table)))
+             : StorageProxy.read(Collections.<ReadCommand>singletonList(command), consistencyLevel);
+    }
+
+    public int maxRemaining()
+    {
+        if (queried)
+            return 0;
+
+        return command.filter.countCQL3Rows() ? 1 : command.filter.columns.size();
+    }
+
+    public long timestamp()
+    {
+        return command.timestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/Pageable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/Pageable.java b/src/java/org/apache/cassandra/service/pager/Pageable.java
new file mode 100644
index 0000000..3a69bf4
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/Pageable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.util.List;
+
+import org.apache.cassandra.db.ReadCommand;
+
+/**
+ * Marker interface for commands that can be paged.
+ */
+public interface Pageable
+{
+    public static class ReadCommands implements Pageable
+    {
+        public final List<ReadCommand> commands;
+
+        public ReadCommands(List<ReadCommand> commands)
+        {
+            this.commands = commands;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/QueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPager.java b/src/java/org/apache/cassandra/service/pager/QueryPager.java
new file mode 100644
index 0000000..a390859
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/QueryPager.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.util.List;
+
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+
+/**
+ * Perform a query, paging it by page of a given size.
+ *
+ * This is essentially an iterator of pages. Each call to fetchPage() will
+ * return the next page (i.e. the next list of rows) and isExhausted()
+ * indicates whether there is more page to fetch. The pageSize will
+ * either be in term of cells or in term of CQL3 row, depending on the
+ * parameters of the command we page.
+ *
+ * Please note that the pager might page within rows, so there is no guarantee
+ * that successive pages won't return the same row (though with different
+ * columns every time).
+ *
+ * Also, there is no guarantee that fetchPage() won't return an empty list,
+ * even if isExhausted() return false (but it is guaranteed to return an empty
+ * list *if* isExhausted() return true). Indeed, isExhausted() does *not*
+ * trigger a query so in some (failry rare) case we might not know the paging
+ * is done even though it is.
+ */
+public interface QueryPager
+{
+    /**
+     * Fetches the next page.
+     *
+     * @param pageSize the maximum number of elements to return in the next page.
+     * @return the page of result.
+     */
+    public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException;
+
+    /**
+     * Whether or not this pager is exhausted, i.e. whether or not a call to
+     * fetchPage may return more result.
+     *
+     * @return whether the pager is exhausted.
+     */
+    public boolean isExhausted();
+
+    /**
+     * The maximum number of cells/CQL3 row that we may still have to return.
+     * In other words, that's the initial user limit minus what we've already
+     * returned (note that it's not how many we *will* return, just the upper
+     * limit on it).
+     */
+    public int maxRemaining();
+
+    /**
+     * The timestamp used by the last page.
+     */
+    public long timestamp();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/QueryPagers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
new file mode 100644
index 0000000..9bc3afd
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnCounter;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+
+/**
+ * Static utility methods to create query pagers.
+ */
+public class QueryPagers
+{
+    private QueryPagers() {};
+
+    private static int maxQueried(ReadCommand command)
+    {
+        if (command instanceof SliceByNamesReadCommand)
+        {
+            NamesQueryFilter filter = ((SliceByNamesReadCommand)command).filter;
+            return filter.countCQL3Rows() ? 1 : filter.columns.size();
+        }
+        else
+        {
+            SliceQueryFilter filter = ((SliceFromReadCommand)command).filter;
+            return filter.count;
+        }
+    }
+
+    public static boolean mayNeedPaging(Pageable command, int pageSize)
+    {
+        if (command instanceof Pageable.ReadCommands)
+        {
+            List<ReadCommand> commands = ((Pageable.ReadCommands)command).commands;
+
+            int maxQueried = 0;
+            for (ReadCommand readCmd : commands)
+                maxQueried += maxQueried(readCmd);
+
+            return maxQueried > pageSize;
+        }
+        else if (command instanceof ReadCommand)
+        {
+            return maxQueried((ReadCommand)command) > pageSize;
+        }
+        else
+        {
+            assert command instanceof RangeSliceCommand;
+            // We can never be sure a range slice won't need paging
+            return true;
+        }
+    }
+
+    private static QueryPager pager(ReadCommand command, ConsistencyLevel consistencyLevel, boolean local)
+    {
+        if (command instanceof SliceByNamesReadCommand)
+            return new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, local);
+        else
+            return new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, local);
+    }
+
+    private static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel, boolean local)
+    {
+        if (command instanceof Pageable.ReadCommands)
+        {
+            List<ReadCommand> commands = ((Pageable.ReadCommands)command).commands;
+            if (commands.size() == 1)
+                return pager(commands.get(0), consistencyLevel, local);
+
+            return new MultiPartitionPager(commands, consistencyLevel, local);
+        }
+        else if (command instanceof ReadCommand)
+        {
+            return pager((ReadCommand)command, consistencyLevel, local);
+        }
+        else
+        {
+            assert command instanceof RangeSliceCommand;
+            RangeSliceCommand rangeCommand = (RangeSliceCommand)command;
+            if (rangeCommand.predicate instanceof NamesQueryFilter)
+                return new RangeNamesQueryPager(rangeCommand, consistencyLevel, local);
+            else
+                return new RangeSliceQueryPager(rangeCommand, consistencyLevel, local);
+        }
+    }
+
+    public static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel)
+    {
+        return pager(command, consistencyLevel, false);
+    }
+
+    public static QueryPager localPager(Pageable command)
+    {
+        return pager(command, null, true);
+    }
+
+    /**
+     * Convenience method to (locally) page an internal row.
+     * Used to 2ndary index a wide row without dying.
+     */
+    public static Iterator<ColumnFamily> pageRowLocally(final ColumnFamilyStore cfs, ByteBuffer key, final int pageSize)
+    {
+        SliceFromReadCommand command = new SliceFromReadCommand(cfs.metadata.ksName, key, cfs.name, System.currentTimeMillis(), new IdentityQueryFilter());
+        final SliceQueryPager pager = new SliceQueryPager(command, null, true);
+
+        return new Iterator<ColumnFamily>()
+        {
+            // We don't use AbstractIterator because we don't want hasNext() to do an actual query
+            public boolean hasNext()
+            {
+                return !pager.isExhausted();
+            }
+
+            public ColumnFamily next()
+            {
+                try
+                {
+                    List<Row> rows = pager.fetchPage(pageSize);
+                    ColumnFamily cf = rows.isEmpty() ? null : rows.get(0).cf;
+                    return cf == null ? EmptyColumns.factory.create(cfs.metadata) : cf;
+                }
+                catch (Exception e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    /**
+     * Convenience method that count (live) cells/rows for a given slice of a row, but page underneath.
+     */
+    public static int countPaged(String keyspace,
+                                String columnFamily,
+                                ByteBuffer key,
+                                SliceQueryFilter filter,
+                                ConsistencyLevel consistencyLevel,
+                                final int pageSize,
+                                long now) throws RequestValidationException, RequestExecutionException
+    {
+        SliceFromReadCommand command = new SliceFromReadCommand(keyspace, key, columnFamily, now, filter);
+        final SliceQueryPager pager = new SliceQueryPager(command, consistencyLevel, false);
+
+        ColumnCounter counter = filter.columnCounter(Schema.instance.getComparator(keyspace, columnFamily), now);
+        while (!pager.isExhausted())
+        {
+            List<Row> next = pager.fetchPage(pageSize);
+            if (!next.isEmpty())
+                counter.countAll(next.get(0).cf);
+        }
+        return counter.live();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
new file mode 100644
index 0000000..e4d4295
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.util.List;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.StorageProxy;
+
+/**
+ * Pages a RangeSliceCommand whose predicate is a name query.
+ *
+ * Note: this only work for NamesQueryFilter that have countCQL3Rows() set,
+ * because this assumes the pageSize is counted in number of internal rows
+ * returned. More precisely, this doesn't do in-row paging so this assumes
+ * that the counter returned by columnCounter() will count 1 for each internal
+ * row.
+ */
+public class RangeNamesQueryPager extends AbstractQueryPager
+{
+    private final RangeSliceCommand command;
+    private volatile RowPosition lastReturnedKey;
+
+    // Don't use directly, use QueryPagers method instead
+    RangeNamesQueryPager(RangeSliceCommand command, ConsistencyLevel consistencyLevel, boolean localQuery)
+    {
+        super(consistencyLevel, command.maxResults, localQuery, command.keyspace, command.columnFamily, command.predicate, command.timestamp);
+        this.command = command;
+        assert columnFilter instanceof NamesQueryFilter && ((NamesQueryFilter)columnFilter).countCQL3Rows();
+    }
+
+    protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery)
+    throws RequestExecutionException
+    {
+        AbstractRangeCommand pageCmd = command.withUpdatedLimit(pageSize);
+        if (lastReturnedKey != null)
+            pageCmd = pageCmd.forSubRange(makeExcludingKeyBounds(lastReturnedKey));
+
+        return localQuery
+             ? pageCmd.executeLocally()
+             : StorageProxy.getRangeSlice(pageCmd, consistencyLevel);
+    }
+
+    protected boolean containsPreviousLast(Row first)
+    {
+        // When querying the next page, we create a bound that exclude the lastReturnedKey
+        return false;
+    }
+
+    protected boolean recordLast(Row last)
+    {
+        lastReturnedKey = last.key;
+        // We return false as that means "can that last be in the next query?"
+        return false;
+    }
+
+    private AbstractBounds<RowPosition> makeExcludingKeyBounds(RowPosition lastReturnedKey)
+    {
+        // We return a range that always exclude lastReturnedKey, since we've already
+        // returned it.
+        AbstractBounds<RowPosition> bounds = command.keyRange;
+        if (bounds instanceof Range || bounds instanceof Bounds)
+        {
+            return new Range<RowPosition>(lastReturnedKey, bounds.right);
+        }
+        else
+        {
+            return new ExcludingBounds<RowPosition>(lastReturnedKey, bounds.right);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
new file mode 100644
index 0000000..578d5c9
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.StorageProxy;
+
+/**
+ * Pages a RangeSliceCommand whose predicate is a slice query.
+ *
+ * Note: this only work for CQL3 queries for now (because thrift queries expect
+ * a different limit on the rows than on the columns, which complicates it).
+ */
+public class RangeSliceQueryPager extends AbstractQueryPager
+{
+    private final RangeSliceCommand command;
+    private volatile RowPosition lastReturnedKey;
+    private volatile ByteBuffer lastReturnedName;
+
+    // Don't use directly, use QueryPagers method instead
+    RangeSliceQueryPager(RangeSliceCommand command, ConsistencyLevel consistencyLevel, boolean localQuery)
+    {
+        super(consistencyLevel, command.maxResults, localQuery, command.keyspace, command.columnFamily, command.predicate, command.timestamp);
+        this.command = command;
+        assert columnFilter instanceof SliceQueryFilter;
+    }
+
+    protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery)
+    throws RequestExecutionException
+    {
+        SliceQueryFilter sf = (SliceQueryFilter)columnFilter;
+        AbstractBounds<RowPosition> keyRange = lastReturnedKey == null ? command.keyRange : makeIncludingKeyBounds(lastReturnedKey);
+        ByteBuffer start = lastReturnedName == null ? sf.start() : lastReturnedName;
+        PagedRangeCommand pageCmd = new PagedRangeCommand(command.keyspace,
+                                                          command.columnFamily,
+                                                          command.timestamp,
+                                                          keyRange,
+                                                          sf,
+                                                          start,
+                                                          sf.finish(),
+                                                          command.rowFilter,
+                                                          pageSize);
+
+        return localQuery
+             ? pageCmd.executeLocally()
+             : StorageProxy.getRangeSlice(pageCmd, consistencyLevel);
+    }
+
+    protected boolean containsPreviousLast(Row first)
+    {
+        return lastReturnedKey != null
+            && lastReturnedKey.equals(first.key)
+            && lastReturnedName.equals(firstName(first.cf));
+    }
+
+    protected boolean recordLast(Row last)
+    {
+        lastReturnedKey = last.key;
+        lastReturnedName = lastName(last.cf);
+        return true;
+    }
+
+    private AbstractBounds<RowPosition> makeIncludingKeyBounds(RowPosition lastReturnedKey)
+    {
+        // We always include lastReturnedKey since we may still be paging within a row,
+        // and PagedRangeCommand will move over if we're not anyway
+        AbstractBounds<RowPosition> bounds = command.keyRange;
+        if (bounds instanceof Range || bounds instanceof Bounds)
+        {
+            return new Bounds<RowPosition>(lastReturnedKey, bounds.right);
+        }
+        else
+        {
+            return new IncludingExcludingBounds<RowPosition>(lastReturnedKey, bounds.right);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
new file mode 100644
index 0000000..693a20e
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import org.apache.cassandra.db.filter.ColumnCounter;
+
+/**
+ * Common interface to single partition queries (by slice and by name).
+ *
+ * For use by MultiPartitionPager.
+ */
+public interface SinglePartitionPager extends QueryPager
+{
+    public ColumnCounter columnCounter();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
new file mode 100644
index 0000000..58ef3c4
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.StorageProxy;
+
+/**
+ * Pager over a SliceFromReadCommand.
+ */
+public class SliceQueryPager extends AbstractQueryPager implements SinglePartitionPager
+{
+    private final SliceFromReadCommand command;
+
+    private volatile ByteBuffer lastReturned;
+
+    // Don't use directly, use QueryPagers method instead
+    SliceQueryPager(SliceFromReadCommand command, ConsistencyLevel consistencyLevel, boolean localQuery)
+    {
+        super(consistencyLevel, command.filter.count, localQuery, command.table, command.cfName, command.filter, command.timestamp);
+        this.command = command;
+    }
+
+    protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery)
+    throws RequestValidationException, RequestExecutionException
+    {
+        SliceQueryFilter filter = command.filter.withUpdatedCount(pageSize);
+        if (lastReturned != null)
+            filter = filter.withUpdatedStart(lastReturned, cfm.comparator);
+
+        ReadCommand pageCmd = command.withUpdatedFilter(filter);
+        return localQuery
+             ? Collections.singletonList(pageCmd.getRow(Table.open(command.table)))
+             : StorageProxy.read(Collections.singletonList(pageCmd), consistencyLevel);
+    }
+
+    protected boolean containsPreviousLast(Row first)
+    {
+        return lastReturned != null && lastReturned.equals(firstName(first.cf));
+    }
+
+    protected boolean recordLast(Row last)
+    {
+        lastReturned = lastName(last.cf);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 55688a8..a15927e 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -45,6 +45,7 @@ import org.apache.cassandra.cql.CQLStatement;
 import org.apache.cassandra.cql.QueryProcessor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
@@ -60,6 +61,7 @@ import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.pager.QueryPagers;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
@@ -368,31 +370,25 @@ public class CassandraServer implements Cassandra.Iface
         }
     }
 
-    private Map<ByteBuffer, List<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace,
-                                                                             List<ByteBuffer> keys,
-                                                                             ColumnParent column_parent,
-                                                                             long timestamp,
-                                                                             SlicePredicate predicate,
-                                                                             ConsistencyLevel consistency_level)
-    throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException
+    private SliceQueryFilter toInternalFilter(CFMetaData metadata, ColumnParent parent, SliceRange range)
     {
-        CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
-        ThriftValidation.validateColumnParent(metadata, column_parent);
-        ThriftValidation.validatePredicate(metadata, column_parent, predicate);
-
-        org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
-        consistencyLevel.validateForRead(keyspace);
+        SliceQueryFilter filter = new SliceQueryFilter(range.start, range.finish, range.reversed, range.count);
+        if (metadata.isSuper())
+            filter = SuperColumns.fromSCSliceFilter((CompositeType)metadata.comparator, parent.bufferForSuper_column(), filter);
+        return filter;
+    }
 
-        List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
+    private IDiskAtomFilter toInternalFilter(CFMetaData metadata, ColumnParent parent, SlicePredicate predicate)
+    {
         IDiskAtomFilter filter;
         if (predicate.column_names != null)
         {
             if (metadata.isSuper())
             {
                 CompositeType type = (CompositeType)metadata.comparator;
-                SortedSet s = new TreeSet<ByteBuffer>(column_parent.isSetSuper_column() ? type.types.get(1) : type.types.get(0));
+                SortedSet s = new TreeSet<ByteBuffer>(parent.isSetSuper_column() ? type.types.get(1) : type.types.get(0));
                 s.addAll(predicate.column_names);
-                filter = SuperColumns.fromSCNamesFilter(type, column_parent.bufferForSuper_column(), new NamesQueryFilter(s));
+                filter = SuperColumns.fromSCNamesFilter(type, parent.bufferForSuper_column(), new NamesQueryFilter(s));
             }
             else
             {
@@ -403,11 +399,28 @@ public class CassandraServer implements Cassandra.Iface
         }
         else
         {
-            SliceRange range = predicate.slice_range;
-            filter = new SliceQueryFilter(range.start, range.finish, range.reversed, range.count);
-            if (metadata.isSuper())
-                filter = SuperColumns.fromSCFilter((CompositeType)metadata.comparator, column_parent.bufferForSuper_column(), filter);
+            filter = toInternalFilter(metadata, parent, predicate.slice_range);
         }
+        return filter;
+    }
+
+    private Map<ByteBuffer, List<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace,
+                                                                             List<ByteBuffer> keys,
+                                                                             ColumnParent column_parent,
+                                                                             long timestamp,
+                                                                             SlicePredicate predicate,
+                                                                             ConsistencyLevel consistency_level)
+    throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException
+    {
+        CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
+        ThriftValidation.validateColumnParent(metadata, column_parent);
+        ThriftValidation.validatePredicate(metadata, column_parent, predicate);
+
+        org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
+        consistencyLevel.validateForRead(keyspace);
+
+        List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
+        IDiskAtomFilter filter = toInternalFilter(metadata, column_parent, predicate);
 
         for (ByteBuffer key: keys)
         {
@@ -530,46 +543,22 @@ public class CassandraServer implements Cassandra.Iface
                 pageSize = COUNT_PAGE_SIZE;
             }
 
-            int totalCount = 0;
-            List<ColumnOrSuperColumn> columns;
-
-            if (predicate.slice_range == null)
-            {
-                predicate.slice_range = new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                       ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                       false,
-                                                       Integer.MAX_VALUE);
-            }
-
-            final int requestedCount = predicate.slice_range.count;
-            int remaining = requestedCount;
-            int pages = 0;
-            while (true)
-            {
-                predicate.slice_range.count = Math.min(pageSize, Math.max(2, remaining)); // fetch at least two columns
-                columns = getSliceInternal(keyspace, key, column_parent, timestamp, predicate, consistency_level);
-                if (columns.isEmpty())
-                    break;
-
-                ByteBuffer firstName = getName(columns.get(0));
-                int newColumns = pages == 0 || !firstName.equals(predicate.slice_range.start) ? columns.size() : columns.size() - 1;
-
-                totalCount += newColumns;
-                // if we over-counted, just return original limit
-                if (totalCount > requestedCount)
-                    return requestedCount;
-                remaining -= newColumns;
-                pages++;
-                // We're done if either:
-                // - We've querying the number of columns requested by the user
-                // - last fetched page only contains the column we already fetched
-                if (remaining == 0 || ((columns.size() == 1) && (firstName.equals(predicate.slice_range.start))))
-                    break;
-                else
-                    predicate.slice_range.start = getName(columns.get(columns.size() - 1));
-            }
+            SliceRange sliceRange = predicate.slice_range == null
+                                  ? new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, Integer.MAX_VALUE)
+                                  : predicate.slice_range;
+            SliceQueryFilter filter = toInternalFilter(cfs.metadata, column_parent, sliceRange);
 
-            return totalCount;
+            return QueryPagers.countPaged(keyspace,
+                                          column_parent.column_family,
+                                          key,
+                                          filter,
+                                          ThriftConversion.fromThrift(consistency_level),
+                                          pageSize,
+                                          timestamp);
+        }
+        catch (RequestExecutionException e)
+        {
+            throw ThriftConversion.rethrow(e);
         }
         catch (RequestValidationException e)
         {
@@ -774,8 +763,7 @@ public class CassandraServer implements Cassandra.Iface
         }
         catch (RequestExecutionException e)
         {
-            ThriftConversion.rethrow(e);
-            return null; // makes javac happy -- it can't tell that rethrow always throws
+            throw ThriftConversion.rethrow(e);
         }
         finally
         {
@@ -1218,22 +1206,16 @@ public class CassandraServer implements Cassandra.Iface
                 bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), end);
             }
 
+            if (range.row_filter != null && !range.row_filter.isEmpty())
+                throw new InvalidRequestException("Cross-row paging is not supported along with index clauses");
+
             List<Row> rows;
             long now = System.currentTimeMillis();
             schedule(DatabaseDescriptor.getRangeRpcTimeout());
             try
             {
                 IDiskAtomFilter filter = ThriftValidation.asIFilter(predicate, metadata, null);
-                rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace,
-                                                                        column_family,
-                                                                        now,
-                                                                        filter,
-                                                                        bounds,
-                                                                        range.row_filter,
-                                                                        range.count,
-                                                                        true,
-                                                                        true),
-                                                  consistencyLevel);
+                rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_family, now, filter, bounds, null, range.count, true, true), consistencyLevel);
             }
             finally
             {
@@ -1878,8 +1860,7 @@ public class CassandraServer implements Cassandra.Iface
         }
         catch (RequestExecutionException e)
         {
-            ThriftConversion.rethrow(e);
-            return null;
+            throw ThriftConversion.rethrow(e);
         }
         catch (RequestValidationException e)
         {
@@ -1913,8 +1894,7 @@ public class CassandraServer implements Cassandra.Iface
         }
         catch (RequestExecutionException e)
         {
-            ThriftConversion.rethrow(e);
-            return null;
+            throw ThriftConversion.rethrow(e);
         }
         catch (RequestValidationException e)
         {
@@ -1994,8 +1974,7 @@ public class CassandraServer implements Cassandra.Iface
         }
         catch (RequestExecutionException e)
         {
-            ThriftConversion.rethrow(e);
-            return null;
+            throw ThriftConversion.rethrow(e);
         }
         catch (RequestValidationException e)
         {
@@ -2034,12 +2013,15 @@ public class CassandraServer implements Cassandra.Iface
                                                                 itemId, org.apache.cassandra.cql3.QueryProcessor.MAX_CACHE_PREPARED, itemId));
             logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, statement.getBoundsTerms());
 
-            return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement, ThriftConversion.fromThrift(cLevel), cState.getQueryState(), bindVariables).toThriftResult();
+            return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement,
+                                                                            ThriftConversion.fromThrift(cLevel),
+                                                                            cState.getQueryState(),
+                                                                            bindVariables,
+                                                                            -1).toThriftResult();
         }
         catch (RequestExecutionException e)
         {
-            ThriftConversion.rethrow(e);
-            return null;
+            throw ThriftConversion.rethrow(e);
         }
         catch (RequestValidationException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index 28725f0..8a9ab59 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -45,7 +45,9 @@ public class ThriftConversion
         throw new AssertionError();
     }
 
-    public static void rethrow(RequestExecutionException e) throws UnavailableException, TimedOutException
+    // We never return, but returning a RuntimeException allows to write "throw rethrow(e)" without java complaining
+    // for methods that have a return value.
+    public static RuntimeException rethrow(RequestExecutionException e) throws UnavailableException, TimedOutException
     {
         if (e instanceof RequestTimeoutException)
             throw toThrift((RequestTimeoutException)e);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/CBCodec.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBCodec.java b/src/java/org/apache/cassandra/transport/CBCodec.java
index 2250816..2ba21d5 100644
--- a/src/java/org/apache/cassandra/transport/CBCodec.java
+++ b/src/java/org/apache/cassandra/transport/CBCodec.java
@@ -22,5 +22,5 @@ import org.jboss.netty.buffer.ChannelBuffer;
 public interface CBCodec<T>
 {
     public T decode(ChannelBuffer body, int version);
-    public ChannelBuffer encode(T t);
+    public ChannelBuffer encode(T t, int version);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index 8e2d765..2f3f3bd 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -35,14 +35,7 @@ import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.transport.messages.CredentialsMessage;
-import org.apache.cassandra.transport.messages.ExecuteMessage;
-import org.apache.cassandra.transport.messages.OptionsMessage;
-import org.apache.cassandra.transport.messages.PrepareMessage;
-import org.apache.cassandra.transport.messages.QueryMessage;
-import org.apache.cassandra.transport.messages.RegisterMessage;
-import org.apache.cassandra.transport.messages.AuthResponse;
-import org.apache.cassandra.transport.messages.StartupMessage;
+import org.apache.cassandra.transport.messages.*;
 import org.apache.cassandra.utils.Hex;
 
 import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
@@ -116,8 +109,37 @@ public class Client extends SimpleClient
         }
         else if (msgType.equals("QUERY"))
         {
-            String query = line.substring(6);
-            return new QueryMessage(query, ConsistencyLevel.ONE);
+            line = line.substring(6);
+            // Ugly hack to allow setting a page size, but that's playground code anyway
+            String query = line;
+            int pageSize = -1;
+            if (line.matches(".+ !\\d+$"))
+            {
+                int idx = line.lastIndexOf('!');
+                query = line.substring(0, idx-1);
+                try
+                {
+                    pageSize = Integer.parseInt(line.substring(idx+1, line.length()));
+                }
+                catch (NumberFormatException e)
+                {
+                    return null;
+                }
+            }
+            return new QueryMessage(query, Collections.<ByteBuffer>emptyList(), ConsistencyLevel.ONE, pageSize);
+        }
+        else if (msgType.equals("NEXT"))
+        {
+            line = line.substring(5);
+            try
+            {
+                int pageSize = Integer.parseInt(line);
+                return new NextMessage(pageSize);
+            }
+            catch (NumberFormatException e)
+            {
+                return null;
+            }
         }
         else if (msgType.equals("PREPARE"))
         {
@@ -145,7 +167,7 @@ public class Client extends SimpleClient
                     }
                     values.add(bb);
                 }
-                return new ExecuteMessage(id, values, ConsistencyLevel.ONE);
+                return new ExecuteMessage(id, values, ConsistencyLevel.ONE, -1);
             }
             catch (Exception e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 54da6a2..eca3697 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -73,7 +73,8 @@ public abstract class Message
         BATCH          (13, Direction.REQUEST,  BatchMessage.codec),
         AUTH_CHALLENGE (14, Direction.RESPONSE, AuthChallenge.codec),
         AUTH_RESPONSE  (15, Direction.REQUEST,  AuthResponse.codec),
-        AUTH_SUCCESS   (16, Direction.RESPONSE, AuthSuccess.codec);
+        AUTH_SUCCESS   (16, Direction.RESPONSE, AuthSuccess.codec),
+        NEXT           (17, Direction.REQUEST,  NextMessage.codec);
 
         public final int opcode;
         public final Direction direction;
@@ -298,11 +299,11 @@ public abstract class Message
             {
                 assert request.connection() instanceof ServerConnection;
                 ServerConnection connection = (ServerConnection)request.connection();
-                connection.validateNewMessage(request.type, request.getVersion());
+                QueryState qstate = connection.validateNewMessage(request.type, request.getVersion(), request.getStreamId());
 
                 logger.debug("Received: {}, v={}", request, request.getVersion());
 
-                Response response = request.execute(connection.getQueryState(request.getStreamId()));
+                Response response = request.execute(qstate);
                 response.setStreamId(request.getStreamId());
                 response.setVersion(request.getVersion());
                 response.attach(connection);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/ServerConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/ServerConnection.java b/src/java/org/apache/cassandra/transport/ServerConnection.java
index ec99440..538258d 100644
--- a/src/java/org/apache/cassandra/transport/ServerConnection.java
+++ b/src/java/org/apache/cassandra/transport/ServerConnection.java
@@ -53,7 +53,7 @@ public class ServerConnection extends Connection
         this.state = State.UNINITIALIZED;
     }
 
-    public QueryState getQueryState(int streamId)
+    private QueryState getQueryState(int streamId)
     {
         QueryState qState = queryStates.get(streamId);
         if (qState == null)
@@ -66,7 +66,7 @@ public class ServerConnection extends Connection
         return qState;
     }
 
-    public void validateNewMessage(Message.Type type, int version)
+    public QueryState validateNewMessage(Message.Type type, int version, int streamId)
     {
         switch (state)
         {
@@ -86,6 +86,20 @@ public class ServerConnection extends Connection
             default:
                 throw new AssertionError();
         }
+
+        QueryState qstate = getQueryState(streamId);
+        if (qstate.hasPager())
+        {
+            if (type != Message.Type.NEXT)
+                qstate.dropPager();
+        }
+        else
+        {
+            if (type == Message.Type.NEXT)
+                throw new ProtocolException("Unexpected NEXT message, paging is not set (or is done)");
+        }
+
+        return qstate;
     }
 
     public void applyStateTransition(Message.Type requestType, Message.Type responseType)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 993a490..3a9c286 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -156,7 +156,7 @@ public class SimpleClient
 
     public ResultMessage execute(String query, List<ByteBuffer> values, ConsistencyLevel consistencyLevel)
     {
-        Message.Response msg = execute(new QueryMessage(query, values, consistencyLevel));
+        Message.Response msg = execute(new QueryMessage(query, values, consistencyLevel, -1));
         assert msg instanceof ResultMessage;
         return (ResultMessage)msg;
     }
@@ -170,7 +170,7 @@ public class SimpleClient
 
     public ResultMessage executePrepared(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency)
     {
-        Message.Response msg = execute(new ExecuteMessage(statementId, values, consistency));
+        Message.Response msg = execute(new ExecuteMessage(statementId, values, consistency, -1));
         assert msg instanceof ResultMessage;
         return (ResultMessage)msg;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java b/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
index bc90dc5..63df7d0 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
@@ -40,7 +40,7 @@ public class AuthChallenge extends Message.Response
         }
 
         @Override
-        public ChannelBuffer encode(AuthChallenge challenge)
+        public ChannelBuffer encode(AuthChallenge challenge, int version)
         {
             return CBUtil.valueToCB(challenge.token);
         }
@@ -57,7 +57,7 @@ public class AuthChallenge extends Message.Response
     @Override
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     public byte[] getToken()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
index 1f8ed9f..8a33a72 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
@@ -52,7 +52,7 @@ public class AuthResponse extends Message.Request
         }
 
         @Override
-        public ChannelBuffer encode(AuthResponse response)
+        public ChannelBuffer encode(AuthResponse response, int version)
         {
             return CBUtil.valueToCB(response.token);
         }
@@ -69,7 +69,7 @@ public class AuthResponse extends Message.Request
     @Override
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java b/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
index ba520bc..13c750a 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
@@ -43,7 +43,7 @@ public class AuthSuccess extends Message.Response
         }
 
         @Override
-        public ChannelBuffer encode(AuthSuccess success)
+        public ChannelBuffer encode(AuthSuccess success, int version)
         {
             return CBUtil.valueToCB(success.token);
         }
@@ -60,7 +60,7 @@ public class AuthSuccess extends Message.Response
     @Override
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     public byte[] getToken()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
index d781f68..292f748 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
@@ -35,7 +35,7 @@ public class AuthenticateMessage extends Message.Response
             return new AuthenticateMessage(authenticator);
         }
 
-        public ChannelBuffer encode(AuthenticateMessage msg)
+        public ChannelBuffer encode(AuthenticateMessage msg, int version)
         {
             return CBUtil.stringToCB(msg.authenticator);
         }
@@ -51,7 +51,7 @@ public class AuthenticateMessage extends Message.Response
 
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index 299d8b8..9fb4482 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -73,7 +73,7 @@ public class BatchMessage extends Message.Request
             return new BatchMessage(toType(type), queryOrIds, variables, consistency);
         }
 
-        public ChannelBuffer encode(BatchMessage msg)
+        public ChannelBuffer encode(BatchMessage msg, int version)
         {
             // We have:
             //   - type
@@ -160,7 +160,7 @@ public class BatchMessage extends Message.Request
 
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     public Message.Response execute(QueryState state)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
index ceff5ba..207907a 100644
--- a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
@@ -55,7 +55,7 @@ public class CredentialsMessage extends Message.Request
             return msg;
         }
 
-        public ChannelBuffer encode(CredentialsMessage msg)
+        public ChannelBuffer encode(CredentialsMessage msg, int version)
         {
             ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
 
@@ -78,7 +78,7 @@ public class CredentialsMessage extends Message.Request
 
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     public Message.Response execute(QueryState state)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index 3243bce..3675f08 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -123,7 +123,7 @@ public class ErrorMessage extends Message.Response
             return new ErrorMessage(te);
         }
 
-        public ChannelBuffer encode(ErrorMessage msg)
+        public ChannelBuffer encode(ErrorMessage msg, int version)
         {
             ChannelBuffer ccb = CBUtil.intToCB(msg.error.code().value);
             ChannelBuffer mcb = CBUtil.stringToCB(msg.error.getMessage());
@@ -213,7 +213,7 @@ public class ErrorMessage extends Message.Response
 
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/EventMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/EventMessage.java b/src/java/org/apache/cassandra/transport/messages/EventMessage.java
index 7d67de9..f7a93ae 100644
--- a/src/java/org/apache/cassandra/transport/messages/EventMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/EventMessage.java
@@ -31,7 +31,7 @@ public class EventMessage extends Message.Response
             return new EventMessage(Event.deserialize(body));
         }
 
-        public ChannelBuffer encode(EventMessage msg)
+        public ChannelBuffer encode(EventMessage msg, int version)
         {
             return msg.event.serialize();
         }
@@ -48,7 +48,7 @@ public class EventMessage extends Message.Response
 
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 8e2b761..7c35e42 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 
+import com.google.common.collect.ImmutableMap;
 import org.jboss.netty.buffer.ChannelBuffer;
 
 import org.apache.cassandra.cql3.CQLStatement;
@@ -49,10 +50,11 @@ public class ExecuteMessage extends Message.Request
                 values.add(CBUtil.readValue(body));
 
             ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
-            return new ExecuteMessage(id, values, consistency);
+            int resultPageSize = body.readInt();
+            return new ExecuteMessage(id, values, consistency, resultPageSize);
         }
 
-        public ChannelBuffer encode(ExecuteMessage msg)
+        public ChannelBuffer encode(ExecuteMessage msg, int version)
         {
             // We have:
             //   - statementId
@@ -60,7 +62,7 @@ public class ExecuteMessage extends Message.Request
             //   - The values
             //   - options
             int vs = msg.values.size();
-            CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(3, 0, vs);
+            CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(4, 0, vs);
             builder.add(CBUtil.bytesToCB(msg.statementId.bytes));
             builder.add(CBUtil.shortToCB(vs));
 
@@ -69,6 +71,7 @@ public class ExecuteMessage extends Message.Request
                 builder.addValue(value);
 
             builder.add(CBUtil.consistencyLevelToCB(msg.consistency));
+            builder.add(CBUtil.intToCB(msg.resultPageSize));
             return builder.build();
         }
     };
@@ -76,23 +79,25 @@ public class ExecuteMessage extends Message.Request
     public final MD5Digest statementId;
     public final List<ByteBuffer> values;
     public final ConsistencyLevel consistency;
+    public final int resultPageSize;
 
-    public ExecuteMessage(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency)
+    public ExecuteMessage(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency, int resultPageSize)
     {
-        this(MD5Digest.wrap(statementId), values, consistency);
+        this(MD5Digest.wrap(statementId), values, consistency, resultPageSize);
     }
 
-    public ExecuteMessage(MD5Digest statementId, List<ByteBuffer> values, ConsistencyLevel consistency)
+    public ExecuteMessage(MD5Digest statementId, List<ByteBuffer> values, ConsistencyLevel consistency, int resultPageSize)
     {
         super(Message.Type.EXECUTE);
         this.statementId = statementId;
         this.values = values;
         this.consistency = consistency;
+        this.resultPageSize = resultPageSize;
     }
 
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     public Message.Response execute(QueryState state)
@@ -104,6 +109,9 @@ public class ExecuteMessage extends Message.Request
             if (statement == null)
                 throw new PreparedQueryNotFoundException(statementId);
 
+            if (resultPageSize == 0)
+                throw new ProtocolException("The page size cannot be 0");
+
             UUID tracingId = null;
             if (isTracingRequested())
             {
@@ -114,11 +122,16 @@ public class ExecuteMessage extends Message.Request
             if (state.traceNextQuery())
             {
                 state.createTracingSession();
+
+                ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+                if (resultPageSize > 0)
+                    builder.put("page_size", Integer.toString(resultPageSize));
+
                 // TODO we don't have [typed] access to CQL bind variables here.  CASSANDRA-4560 is open to add support.
-                Tracing.instance.begin("Execute CQL3 prepared query", Collections.<String, String>emptyMap());
+                Tracing.instance.begin("Execute CQL3 prepared query", builder.build());
             }
 
-            Message.Response response = QueryProcessor.processPrepared(statement, consistency, state, values);
+            Message.Response response = QueryProcessor.processPrepared(statement, consistency, state, values, resultPageSize);
 
             if (tracingId != null)
                 response.setTracingId(tracingId);
@@ -132,6 +145,9 @@ public class ExecuteMessage extends Message.Request
         finally
         {
             Tracing.instance.stopSession();
+            // Trash the current session id if we won't need it anymore
+            if (!state.hasPager())
+                state.getAndResetCurrentTracingSession();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/NextMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/NextMessage.java b/src/java/org/apache/cassandra/transport/messages/NextMessage.java
new file mode 100644
index 0000000..d68f603
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/NextMessage.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import java.util.UUID;
+
+import com.google.common.collect.ImmutableMap;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.*;
+import org.apache.cassandra.utils.UUIDGen;
+
+public class NextMessage extends Message.Request
+{
+    public static final Message.Codec<NextMessage> codec = new Message.Codec<NextMessage>()
+    {
+        public NextMessage decode(ChannelBuffer body, int version)
+        {
+            int resultPageSize = body.readInt();
+            return new NextMessage(resultPageSize);
+        }
+
+        public ChannelBuffer encode(NextMessage msg, int version)
+        {
+            return CBUtil.intToCB(msg.resultPageSize);
+        }
+    };
+
+    public final int resultPageSize;
+
+    public NextMessage(int resultPageSize)
+    {
+        super(Type.NEXT);
+        this.resultPageSize = resultPageSize;
+    }
+
+    public ChannelBuffer encode()
+    {
+        return codec.encode(this, getVersion());
+    }
+
+    public Message.Response execute(QueryState state)
+    {
+        try
+        {
+            if (resultPageSize == 0)
+                throw new ProtocolException("The page size cannot be 0");
+
+            /*
+             * If we had traced the previous page and we are asked to trace this one,
+             * record the previous id to allow linking the trace together.
+             */
+            UUID previousTracingId = state.getAndResetCurrentTracingSession();
+
+            UUID tracingId = null;
+            if (isTracingRequested())
+            {
+                tracingId = UUIDGen.getTimeUUID();
+                state.prepareTracingSession(tracingId);
+            }
+
+            if (state.traceNextQuery())
+            {
+                state.createTracingSession();
+                ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+                if (resultPageSize > 0)
+                    builder.put("page_size", Integer.toString(resultPageSize));
+                if (previousTracingId != null)
+                    builder.put("previous_trace", previousTracingId.toString());
+                Tracing.instance.begin("Continue paged CQL3 query", builder.build());
+            }
+
+            Message.Response response = state.getNextPage(resultPageSize < 0 ? Integer.MAX_VALUE : resultPageSize);
+
+            if (tracingId != null)
+                response.setTracingId(tracingId);
+
+            return response;
+        }
+        catch (Exception e)
+        {
+            if (!((e instanceof RequestValidationException) || (e instanceof RequestExecutionException)))
+                logger.error("Unexpected error during query", e);
+            return ErrorMessage.fromException(e);
+        }
+        finally
+        {
+            Tracing.instance.stopSession();
+            // Trash the current session id if we won't need it anymore
+            if (!state.hasPager())
+                state.getAndResetCurrentTracingSession();
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        return "NEXT " + resultPageSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
index 6e753d3..5afefb5 100644
--- a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
@@ -42,7 +42,7 @@ public class OptionsMessage extends Message.Request
             return new OptionsMessage();
         }
 
-        public ChannelBuffer encode(OptionsMessage msg)
+        public ChannelBuffer encode(OptionsMessage msg, int version)
         {
             return ChannelBuffers.EMPTY_BUFFER;
         }
@@ -55,7 +55,7 @@ public class OptionsMessage extends Message.Request
 
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     public Message.Response execute(QueryState state)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
index 3e7fe51..851f3f8 100644
--- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
@@ -38,7 +38,7 @@ public class PrepareMessage extends Message.Request
             return new PrepareMessage(query);
         }
 
-        public ChannelBuffer encode(PrepareMessage msg)
+        public ChannelBuffer encode(PrepareMessage msg, int version)
         {
             return CBUtil.longStringToCB(msg.query);
         }
@@ -54,7 +54,7 @@ public class PrepareMessage extends Message.Request
 
     public ChannelBuffer encode()
     {
-        return codec.encode(this);
+        return codec.encode(this, getVersion());
     }
 
     public Message.Response execute(QueryState state)