You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:35 UTC

[11/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index c330eea..2c16ace 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -17,156 +17,133 @@
  */
 package org.apache.cassandra.service.pager;
 
-import java.util.*;
-
-import com.google.common.annotations.VisibleForTesting;
-
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ColumnCounter;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.exceptions.RequestValidationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.cassandra.service.ClientState;
 
 abstract class AbstractQueryPager implements QueryPager
 {
-    private static final Logger logger = LoggerFactory.getLogger(AbstractQueryPager.class);
+    protected final ReadCommand command;
+    protected final DataLimits limits;
 
-    private final ConsistencyLevel consistencyLevel;
-    private final boolean localQuery;
+    private int remaining;
 
-    protected final CFMetaData cfm;
-    protected final IDiskAtomFilter columnFilter;
-    private final long timestamp;
+    // This is the last key we've been reading from (or can still be reading within). This the key for
+    // which remainingInPartition makes sense: if we're starting another key, we should reset remainingInPartition
+    // (and this is done in PagerIterator). This can be null (when we start).
+    private DecoratedKey lastKey;
+    private int remainingInPartition;
 
-    private int remaining;
     private boolean exhausted;
-    private boolean shouldFetchExtraRow;
 
-    protected AbstractQueryPager(ConsistencyLevel consistencyLevel,
-                                 int toFetch,
-                                 boolean localQuery,
-                                 String keyspace,
-                                 String columnFamily,
-                                 IDiskAtomFilter columnFilter,
-                                 long timestamp)
+    protected AbstractQueryPager(ReadCommand command)
     {
-        this(consistencyLevel, toFetch, localQuery, Schema.instance.getCFMetaData(keyspace, columnFamily), columnFilter, timestamp);
+        this.command = command;
+        this.limits = command.limits();
+
+        this.remaining = limits.count();
+        this.remainingInPartition = limits.perPartitionCount();
     }
 
-    protected AbstractQueryPager(ConsistencyLevel consistencyLevel,
-                                 int toFetch,
-                                 boolean localQuery,
-                                 CFMetaData cfm,
-                                 IDiskAtomFilter columnFilter,
-                                 long timestamp)
+    public ReadOrderGroup startOrderGroup()
     {
-        this.consistencyLevel = consistencyLevel;
-        this.localQuery = localQuery;
+        return command.startOrderGroup();
+    }
 
-        this.cfm = cfm;
-        this.columnFilter = columnFilter;
-        this.timestamp = timestamp;
+    public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException
+    {
+        if (isExhausted())
+            return PartitionIterators.EMPTY;
 
-        this.remaining = toFetch;
+        pageSize = Math.min(pageSize, remaining);
+        return new PagerIterator(nextPageReadCommand(pageSize).execute(consistency, clientState), limits.forPaging(pageSize), command.nowInSec());
     }
 
-
-    public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
+    public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup) throws RequestValidationException, RequestExecutionException
     {
         if (isExhausted())
-            return Collections.emptyList();
+            return PartitionIterators.EMPTY;
 
-        int currentPageSize = nextPageSize(pageSize);
-        List<Row> rows = filterEmpty(queryNextPage(currentPageSize, consistencyLevel, localQuery));
+        pageSize = Math.min(pageSize, remaining);
+        return new PagerIterator(nextPageReadCommand(pageSize).executeInternal(orderGroup), limits.forPaging(pageSize), command.nowInSec());
+    }
 
-        if (rows.isEmpty())
-        {
-            logger.debug("Got empty set of rows, considering pager exhausted");
-            exhausted = true;
-            return Collections.emptyList();
-        }
+    private class PagerIterator extends CountingPartitionIterator
+    {
+        private final DataLimits pageLimits;
 
-        int liveCount = getPageLiveCount(rows);
-        logger.debug("Fetched {} live rows", liveCount);
+        private Row lastRow;
 
-        // Because SP.getRangeSlice doesn't trim the result (see SP.trim()), liveCount may be greater than what asked
-        // (currentPageSize). This would throw off the paging logic so we trim the excess. It's not extremely efficient
-        // but most of the time there should be nothing or very little to trim.
-        if (liveCount > currentPageSize)
+        private PagerIterator(PartitionIterator iter, DataLimits pageLimits, int nowInSec)
         {
-            rows = discardLast(rows, liveCount - currentPageSize);
-            liveCount = currentPageSize;
+            super(iter, pageLimits, nowInSec);
+            this.pageLimits = pageLimits;
         }
 
-        remaining -= liveCount;
-
-        // If we've got less than requested, there is no more query to do (but
-        // we still need to return the current page)
-        if (liveCount < currentPageSize)
+        @Override
+        @SuppressWarnings("resource") // iter is closed by closing the result
+        public RowIterator next()
         {
-            logger.debug("Got result ({}) smaller than page size ({}), considering pager exhausted", liveCount, currentPageSize);
-            exhausted = true;
-        }
+            RowIterator iter = super.next();
+            try
+            {
+                DecoratedKey key = iter.partitionKey();
+                if (lastKey == null || !lastKey.equals(key))
+                    remainingInPartition = limits.perPartitionCount();
 
-        // If it's not the first query and the first column is the last one returned (likely
-        // but not certain since paging can race with deletes/expiration), then remove the
-        // first column.
-        if (containsPreviousLast(rows.get(0)))
-        {
-            rows = discardFirst(rows);
-            remaining++;
-        }
-        // Otherwise, if 'shouldFetchExtraRow' was set, we queried for one more than the page size,
-        // so if the page is full, trim the last entry
-        else if (shouldFetchExtraRow && !exhausted)
-        {
-            // We've asked for one more than necessary
-            rows = discardLast(rows);
-            remaining++;
+                lastKey = key;
+                return new RowPagerIterator(iter);
+            }
+            catch (RuntimeException e)
+            {
+                iter.close();
+                throw e;
+            }
         }
 
-        logger.debug("Remaining rows to page: {}", remaining);
-
-        if (!isExhausted())
-            shouldFetchExtraRow = recordLast(rows.get(rows.size() - 1));
+        @Override
+        public void close()
+        {
+            super.close();
+            recordLast(lastKey, lastRow);
 
-        return rows;
-    }
+            int counted = counter.counted();
+            remaining -= counted;
+            remainingInPartition -= counter.countedInCurrentPartition();
+            exhausted = counted < pageLimits.count();
+        }
 
-    private List<Row> filterEmpty(List<Row> result)
-    {
-        for (Row row : result)
+        private class RowPagerIterator extends WrappingRowIterator
         {
-            if (row.cf == null || !row.cf.hasColumns())
+            RowPagerIterator(RowIterator iter)
             {
-                List<Row> newResult = new ArrayList<Row>(result.size() - 1);
-                for (Row row2 : result)
-                {
-                    if (row2.cf == null || !row2.cf.hasColumns())
-                        continue;
+                super(iter);
+            }
 
-                    newResult.add(row2);
-                }
-                return newResult;
+            @Override
+            public Row next()
+            {
+                lastRow = super.next();
+                return lastRow;
             }
         }
-        return result;
     }
 
-    protected void restoreState(int remaining, boolean shouldFetchExtraRow)
+    protected void restoreState(DecoratedKey lastKey, int remaining, int remainingInPartition)
     {
+        this.lastKey = lastKey;
         this.remaining = remaining;
-        this.shouldFetchExtraRow = shouldFetchExtraRow;
+        this.remainingInPartition = remainingInPartition;
     }
 
     public boolean isExhausted()
     {
-        return exhausted || remaining == 0;
+        return exhausted || remaining == 0 || ((this instanceof SinglePartitionPager) && remainingInPartition == 0);
     }
 
     public int maxRemaining()
@@ -174,220 +151,11 @@ abstract class AbstractQueryPager implements QueryPager
         return remaining;
     }
 
-    public long timestamp()
+    protected int remainingInPartition()
     {
-        return timestamp;
-    }
-
-    private int nextPageSize(int pageSize)
-    {
-        return Math.min(remaining, pageSize) + (shouldFetchExtraRow ? 1 : 0);
-    }
-
-    public ColumnCounter columnCounter()
-    {
-        return columnFilter.columnCounter(cfm.comparator, timestamp);
-    }
-
-    protected abstract List<Row> queryNextPage(int pageSize, ConsistencyLevel consistency, boolean localQuery) throws RequestValidationException, RequestExecutionException;
-
-    /**
-     * Checks to see if the first row of a new page contains the last row from the previous page.
-     * @param first the first row of the new page
-     * @return true if <code>first</code> contains the last from from the previous page and it is live, false otherwise
-     */
-    protected abstract boolean containsPreviousLast(Row first);
-
-    /**
-     * Saves the paging state by recording the last seen partition key and cell name (where applicable).
-     * @param last the last row in the current page
-     * @return true if an extra row should be fetched in the next page,false otherwise
-     */
-    protected abstract boolean recordLast(Row last);
-
-    protected abstract boolean isReversed();
-
-    private List<Row> discardFirst(List<Row> rows)
-    {
-        return discardFirst(rows, 1);
-    }
-
-    @VisibleForTesting
-    List<Row> discardFirst(List<Row> rows, int toDiscard)
-    {
-        if (toDiscard == 0 || rows.isEmpty())
-            return rows;
-
-        int i = 0;
-        DecoratedKey firstKey = null;
-        ColumnFamily firstCf = null;
-        while (toDiscard > 0 && i < rows.size())
-        {
-            Row first = rows.get(i++);
-            firstKey = first.key;
-            firstCf = first.cf.cloneMeShallow(isReversed());
-            toDiscard -= isReversed()
-                       ? discardLast(first.cf, toDiscard, firstCf)
-                       : discardFirst(first.cf, toDiscard, firstCf);
-        }
-
-        // If there is less live data than to discard, all is discarded
-        if (toDiscard > 0)
-            return Collections.<Row>emptyList();
-
-        // i is the index of the first row that we are sure to keep. On top of that,
-        // we also keep firstCf is it hasn't been fully emptied by the last iteration above.
-        int count = firstCf.getColumnCount();
-        int newSize = rows.size() - (count == 0 ? i : i - 1);
-        List<Row> newRows = new ArrayList<Row>(newSize);
-        if (count != 0)
-            newRows.add(new Row(firstKey, firstCf));
-        newRows.addAll(rows.subList(i, rows.size()));
-
-        return newRows;
+        return remainingInPartition;
     }
 
-    private List<Row> discardLast(List<Row> rows)
-    {
-        return discardLast(rows, 1);
-    }
-
-    @VisibleForTesting
-    List<Row> discardLast(List<Row> rows, int toDiscard)
-    {
-        if (toDiscard == 0 || rows.isEmpty())
-            return rows;
-
-        int i = rows.size()-1;
-        DecoratedKey lastKey = null;
-        ColumnFamily lastCf = null;
-        while (toDiscard > 0 && i >= 0)
-        {
-            Row last = rows.get(i--);
-            lastKey = last.key;
-            lastCf = last.cf.cloneMeShallow(isReversed());
-            toDiscard -= isReversed()
-                       ? discardFirst(last.cf, toDiscard, lastCf)
-                       : discardLast(last.cf, toDiscard, lastCf);
-        }
-
-        // If there is less live data than to discard, all is discarded
-        if (toDiscard > 0)
-            return Collections.<Row>emptyList();
-
-        // i is the index of the last row that we are sure to keep. On top of that,
-        // we also keep lastCf is it hasn't been fully emptied by the last iteration above.
-        int count = lastCf.getColumnCount();
-        int newSize = count == 0 ? i+1 : i+2;
-        List<Row> newRows = new ArrayList<Row>(newSize);
-        newRows.addAll(rows.subList(0, i+1));
-        if (count != 0)
-            newRows.add(new Row(lastKey, lastCf));
-
-        return newRows;
-    }
-
-    private int getPageLiveCount(List<Row> page)
-    {
-        int count = 0;
-        for (Row row : page)
-            count += columnCounter().countAll(row.cf).live();
-        return count;
-    }
-
-    private int discardFirst(ColumnFamily cf, int toDiscard, ColumnFamily newCf)
-    {
-        boolean isReversed = isReversed();
-        DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester(isReversed);
-        return isReversed
-             ? discardTail(cf, toDiscard, newCf, cf.reverseIterator(), tester)
-             : discardHead(toDiscard, newCf, cf.iterator(), tester);
-    }
-
-    private int discardLast(ColumnFamily cf, int toDiscard, ColumnFamily newCf)
-    {
-        boolean isReversed = isReversed();
-        DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester(isReversed);
-        return isReversed
-             ? discardHead(toDiscard, newCf, cf.reverseIterator(), tester)
-             : discardTail(cf, toDiscard, newCf, cf.iterator(), tester);
-    }
-
-    private int discardHead(int toDiscard, ColumnFamily copy, Iterator<Cell> iter, DeletionInfo.InOrderTester tester)
-    {
-        ColumnCounter counter = columnCounter();
-
-        List<Cell> staticCells = new ArrayList<>(cfm.staticColumns().size());
-
-        // Discard the first 'toDiscard' live, non-static cells
-        while (iter.hasNext())
-        {
-            Cell c = iter.next();
-
-            // if it's a static column, don't count it and save it to add to the trimmed results
-            ColumnDefinition columnDef = cfm.getColumnDefinition(c.name());
-            if (columnDef != null && columnDef.kind == ColumnDefinition.Kind.STATIC)
-            {
-                staticCells.add(c);
-                continue;
-            }
-
-            counter.count(c, tester);
-
-            // once we've discarded the required amount, add the rest
-            if (counter.live() > toDiscard)
-            {
-                for (Cell staticCell : staticCells)
-                    copy.addColumn(staticCell);
-
-                copy.addColumn(c);
-                while (iter.hasNext())
-                    copy.addColumn(iter.next());
-            }
-        }
-        return Math.min(counter.live(), toDiscard);
-    }
-
-    private int discardTail(ColumnFamily cf, int toDiscard, ColumnFamily copy, Iterator<Cell> iter, DeletionInfo.InOrderTester tester)
-    {
-        // Redoing the counting like that is not extremely efficient.
-        // This is called only for reversed slices or in the case of a race between
-        // paging and a deletion (pretty unlikely), so this is probably acceptable.
-        int liveCount = columnCounter().countAll(cf).live();
-
-        ColumnCounter counter = columnCounter();
-        // Discard the last 'toDiscard' live (so stop adding as sound as we're past 'liveCount - toDiscard')
-        while (iter.hasNext())
-        {
-            Cell c = iter.next();
-            counter.count(c, tester);
-            if (counter.live() > liveCount - toDiscard)
-                break;
-
-            copy.addColumn(c);
-        }
-        return Math.min(liveCount, toDiscard);
-    }
-
-    /**
-     * Returns the first non-static cell in the ColumnFamily.  This is necessary to avoid recording a static column
-     * as the "last" cell seen in a reversed query.  Because we will always query static columns alongside the normal
-     * data for a page, they are not a good indicator of where paging should resume.  When we begin the next page, we
-     * need to start from the last non-static cell.
-     */
-    protected Cell firstNonStaticCell(ColumnFamily cf)
-    {
-        for (Cell cell : cf)
-        {
-            ColumnDefinition def = cfm.getColumnDefinition(cell.name());
-            if (def == null || def.kind != ColumnDefinition.Kind.STATIC)
-                return cell;
-        }
-        return null;
-    }
-
-    protected static Cell lastCell(ColumnFamily cf)
-    {
-        return cf.getReverseSortedColumns().iterator().next();
-    }
+    protected abstract ReadCommand nextPageReadCommand(int pageSize);
+    protected abstract void recordLast(DecoratedKey key, Row row);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
index 35d0971..4fb1429 100644
--- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@ -17,10 +17,14 @@
  */
 package org.apache.cassandra.service.pager;
 
-import java.util.ArrayList;
 import java.util.List;
 
+import com.google.common.collect.AbstractIterator;
+
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.service.ClientState;
@@ -39,53 +43,44 @@ import org.apache.cassandra.service.ClientState;
  * cfs meanRowSize to decide if parallelizing some of the command might be worth it while being confident we don't
  * blow out memory.
  */
-class MultiPartitionPager implements QueryPager
+public class MultiPartitionPager implements QueryPager
 {
     private final SinglePartitionPager[] pagers;
-    private final long timestamp;
+    private final DataLimits limit;
+
+    private final int nowInSec;
 
     private int remaining;
     private int current;
 
-    MultiPartitionPager(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, ClientState cState, boolean localQuery, PagingState state, int limitForQuery)
+    public MultiPartitionPager(SinglePartitionReadCommand.Group group, PagingState state)
     {
+        this.limit = group.limits();
+        this.nowInSec = group.nowInSec();
+
         int i = 0;
         // If it's not the beginning (state != null), we need to find where we were and skip previous commands
         // since they are done.
         if (state != null)
-            for (; i < commands.size(); i++)
-                if (commands.get(i).key.equals(state.partitionKey))
+            for (; i < group.commands.size(); i++)
+                if (group.commands.get(i).partitionKey().getKey().equals(state.partitionKey))
                     break;
 
-        if (i >= commands.size())
+        if (i >= group.commands.size())
         {
             pagers = null;
-            timestamp = -1;
             return;
         }
 
-        pagers = new SinglePartitionPager[commands.size() - i];
+        pagers = new SinglePartitionPager[group.commands.size() - i];
         // 'i' is on the first non exhausted pager for the previous page (or the first one)
-        pagers[0] = makePager(commands.get(i), consistencyLevel, cState, localQuery, state);
-        timestamp = commands.get(i).timestamp;
+        pagers[0] = group.commands.get(i).getPager(state);
 
         // Following ones haven't been started yet
-        for (int j = i + 1; j < commands.size(); j++)
-        {
-            ReadCommand command = commands.get(j);
-            if (command.timestamp != timestamp)
-                throw new IllegalArgumentException("All commands must have the same timestamp or weird results may happen.");
-            pagers[j - i] = makePager(command, consistencyLevel, cState, localQuery, null);
-        }
-
-        remaining = state == null ? limitForQuery : state.remaining;
-    }
+        for (int j = i + 1; j < group.commands.size(); j++)
+            pagers[j - i] = group.commands.get(j).getPager(null);
 
-    private static SinglePartitionPager makePager(ReadCommand command, ConsistencyLevel consistencyLevel, ClientState cState, boolean localQuery, PagingState state)
-    {
-        return command instanceof SliceFromReadCommand
-             ? new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, cState, localQuery, state)
-             : new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, cState, localQuery);
+        remaining = state == null ? limit.count() : state.remaining;
     }
 
     public PagingState state()
@@ -95,7 +90,7 @@ class MultiPartitionPager implements QueryPager
             return null;
 
         PagingState state = pagers[current].state();
-        return new PagingState(pagers[current].key(), state == null ? null : state.cellName, remaining);
+        return new PagingState(pagers[current].key(), state == null ? null : state.cellName, remaining, Integer.MAX_VALUE);
     }
 
     public boolean isExhausted()
@@ -113,35 +108,92 @@ class MultiPartitionPager implements QueryPager
         return true;
     }
 
-    public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
+    public ReadOrderGroup startOrderGroup()
     {
-        List<Row> result = new ArrayList<Row>();
-
-        int remainingThisQuery = Math.min(remaining, pageSize);
-        while (remainingThisQuery > 0 && !isExhausted())
+        // Note that for all pagers, the only difference is the partition key to which it applies, so in practice we
+        // can use any of the sub-pager ReadOrderGroup group to protect the whole pager
+        for (int i = current; i < pagers.length; i++)
         {
-            // isExhausted has set us on the first non-exhausted pager
-            List<Row> page = pagers[current].fetchPage(remainingThisQuery);
-            if (page.isEmpty())
-                continue;
-
-            Row row = page.get(0);
-            int fetched = pagers[current].columnCounter().countAll(row.cf).live();
-            remaining -= fetched;
-            remainingThisQuery -= fetched;
-            result.add(row);
+            if (pagers[i] != null)
+                return pagers[i].startOrderGroup();
         }
+        throw new AssertionError("Shouldn't be called on an exhausted pager");
+    }
 
-        return result;
+    @SuppressWarnings("resource")
+    public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException
+    {
+        int toQuery = Math.min(remaining, pageSize);
+        PagersIterator iter = new PagersIterator(toQuery, consistency, clientState, null);
+        CountingPartitionIterator countingIter = new CountingPartitionIterator(iter, limit.forPaging(toQuery), nowInSec);
+        iter.setCounter(countingIter.counter());
+        return countingIter;
     }
 
-    public int maxRemaining()
+    public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup) throws RequestValidationException, RequestExecutionException
     {
-        return remaining;
+        int toQuery = Math.min(remaining, pageSize);
+        PagersIterator iter = new PagersIterator(toQuery, null, null, orderGroup);
+        CountingPartitionIterator countingIter = new CountingPartitionIterator(iter, limit.forPaging(toQuery), nowInSec);
+        iter.setCounter(countingIter.counter());
+        return countingIter;
+    }
+
+    private class PagersIterator extends AbstractIterator<RowIterator> implements PartitionIterator
+    {
+        private final int pageSize;
+        private PartitionIterator result;
+        private DataLimits.Counter counter;
+
+        // For "normal" queries
+        private final ConsistencyLevel consistency;
+        private final ClientState clientState;
+
+        // For internal queries
+        private final ReadOrderGroup orderGroup;
+
+        public PagersIterator(int pageSize, ConsistencyLevel consistency, ClientState clientState, ReadOrderGroup orderGroup)
+        {
+            this.pageSize = pageSize;
+            this.consistency = consistency;
+            this.clientState = clientState;
+            this.orderGroup = orderGroup;
+        }
+
+        public void setCounter(DataLimits.Counter counter)
+        {
+            this.counter = counter;
+        }
+
+        protected RowIterator computeNext()
+        {
+            while (result == null || !result.hasNext())
+            {
+                // This sets us on the first non-exhausted pager
+                if (isExhausted())
+                    return endOfData();
+
+                if (result != null)
+                    result.close();
+
+                int toQuery = pageSize - counter.counted();
+                result = consistency == null
+                       ? pagers[current].fetchPageInternal(toQuery, orderGroup)
+                       : pagers[current].fetchPage(toQuery, consistency, clientState);
+            }
+            return result.next();
+        }
+
+        public void close()
+        {
+            remaining -= counter.counted();
+            if (result != null)
+                result.close();
+        }
     }
 
-    public long timestamp()
+    public int maxRemaining()
     {
-        return timestamp;
+        return remaining;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java b/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
deleted file mode 100644
index d03e582..0000000
--- a/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service.pager;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ColumnCounter;
-import org.apache.cassandra.exceptions.RequestValidationException;
-import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.StorageProxy;
-
-/**
- * Pager over a SliceByNamesReadCommand.
- */
-public class NamesQueryPager implements SinglePartitionPager
-{
-    private final SliceByNamesReadCommand command;
-    private final ConsistencyLevel consistencyLevel;
-    private final ClientState state;
-    private final boolean localQuery;
-
-    private volatile boolean queried;
-
-    /**
-     * For now, we'll only use this in CQL3. In there, as name query can never
-     * yield more than one CQL3 row, there is no need for paging and so this is straight-forward.
-     *
-     * For thrift, we could imagine needing to page, though even then it's very
-     * unlikely unless the pageSize is very small.
-     *
-     * In any case we currently assert in fetchPage if it's a "thrift" query (i.e. a query that
-     * count every cell individually) and the names filter asks for more than pageSize columns.
-     */
-    // Don't use directly, use QueryPagers method instead
-    NamesQueryPager(SliceByNamesReadCommand command, ConsistencyLevel consistencyLevel, ClientState state, boolean localQuery)
-    {
-        this.command = command;
-        this.consistencyLevel = consistencyLevel;
-        this.state = state;
-        this.localQuery = localQuery;
-    }
-
-    public ByteBuffer key()
-    {
-        return command.key;
-    }
-
-    public ColumnCounter columnCounter()
-    {
-        // We know NamesQueryFilter.columnCounter don't care about his argument
-        return command.filter.columnCounter(null, command.timestamp);
-    }
-
-    public PagingState state()
-    {
-        return null;
-    }
-
-    public boolean isExhausted()
-    {
-        return queried;
-    }
-
-    public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
-    {
-        assert command.filter.countCQL3Rows() || command.filter.columns.size() <= pageSize;
-
-        if (isExhausted())
-            return Collections.<Row>emptyList();
-
-        queried = true;
-        return localQuery
-             ? Collections.singletonList(command.getRow(Keyspace.open(command.ksName)))
-             : StorageProxy.read(Collections.<ReadCommand>singletonList(command), consistencyLevel, state);
-    }
-
-    public int maxRemaining()
-    {
-        if (queried)
-            return 0;
-
-        return command.filter.countCQL3Rows() ? 1 : command.filter.columns.size();
-    }
-
-    public long timestamp()
-    {
-        return command.timestamp;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/Pageable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/Pageable.java b/src/java/org/apache/cassandra/service/pager/Pageable.java
deleted file mode 100644
index d4986f7..0000000
--- a/src/java/org/apache/cassandra/service/pager/Pageable.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service.pager;
-
-import java.util.List;
-
-import org.apache.cassandra.db.ReadCommand;
-
-/**
- * Marker interface for commands that can be paged.
- */
-public interface Pageable
-{
-    public static class ReadCommands implements Pageable
-    {
-        public final List<ReadCommand> commands;
-
-        public final int limitForQuery;
-
-        public ReadCommands(List<ReadCommand> commands, int limitForQuery)
-        {
-            this.commands = commands;
-            this.limitForQuery = limitForQuery;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/PagingState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PagingState.java b/src/java/org/apache/cassandra/service/pager/PagingState.java
index f168880..685dc3f 100644
--- a/src/java/org/apache/cassandra/service/pager/PagingState.java
+++ b/src/java/org/apache/cassandra/service/pager/PagingState.java
@@ -31,12 +31,14 @@ public class PagingState
     public final ByteBuffer partitionKey;
     public final ByteBuffer cellName;
     public final int remaining;
+    public final int remainingInPartition;
 
-    public PagingState(ByteBuffer partitionKey, ByteBuffer cellName, int remaining)
+    public PagingState(ByteBuffer partitionKey, ByteBuffer cellName, int remaining, int remainingInPartition)
     {
         this.partitionKey = partitionKey == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : partitionKey;
         this.cellName = cellName == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : cellName;
         this.remaining = remaining;
+        this.remainingInPartition = remainingInPartition;
     }
 
     public static PagingState deserialize(ByteBuffer bytes)
@@ -50,7 +52,12 @@ public class PagingState
             ByteBuffer pk = ByteBufferUtil.readWithShortLength(in);
             ByteBuffer cn = ByteBufferUtil.readWithShortLength(in);
             int remaining = in.readInt();
-            return new PagingState(pk, cn, remaining);
+            // Note that while 'in.available()' is theoretically an estimate of how many bytes are available
+            // without blocking, we know that since we're reading a ByteBuffer it will be exactly how many
+            // bytes remain to be read. And the reason we want to condition this is for backward compatility
+            // as we used to not set this.
+            int remainingInPartition = in.available() > 0 ? in.readInt() : Integer.MAX_VALUE;
+            return new PagingState(pk, cn, remaining, remainingInPartition);
         }
         catch (IOException e)
         {
@@ -65,6 +72,7 @@ public class PagingState
             ByteBufferUtil.writeWithShortLength(partitionKey, out);
             ByteBufferUtil.writeWithShortLength(cellName, out);
             out.writeInt(remaining);
+            out.writeInt(remainingInPartition);
             return out.buffer();
         }
         catch (IOException e)
@@ -77,12 +85,16 @@ public class PagingState
     {
         return 2 + partitionKey.remaining()
              + 2 + cellName.remaining()
-             + 4;
+             + 8; // remaining & remainingInPartition
     }
 
     @Override
     public String toString()
     {
-        return String.format("PagingState(key=%s, cellname=%s, remaining=%d", ByteBufferUtil.bytesToHex(partitionKey), ByteBufferUtil.bytesToHex(cellName), remaining);
+        return String.format("PagingState(key=%s, cellname=%s, remaining=%d, remainingInPartition=%d",
+                             ByteBufferUtil.bytesToHex(partitionKey),
+                             ByteBufferUtil.bytesToHex(cellName),
+                             remaining,
+                             remainingInPartition);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/QueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPager.java b/src/java/org/apache/cassandra/service/pager/QueryPager.java
index ab2dad7..a69335d 100644
--- a/src/java/org/apache/cassandra/service/pager/QueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPager.java
@@ -17,11 +17,13 @@
  */
 package org.apache.cassandra.service.pager;
 
-import java.util.List;
-
-import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.ReadOrderGroup;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionIterators;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.service.ClientState;
 
 /**
  * Perform a query, paging it by page of a given size.
@@ -44,13 +46,69 @@ import org.apache.cassandra.exceptions.RequestValidationException;
  */
 public interface QueryPager
 {
+    public static final QueryPager EMPTY = new QueryPager()
+    {
+        public ReadOrderGroup startOrderGroup()
+        {
+            return ReadOrderGroup.emptyGroup();
+        }
+
+        public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException
+        {
+            return PartitionIterators.EMPTY;
+        }
+
+        public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup) throws RequestValidationException, RequestExecutionException
+        {
+            return PartitionIterators.EMPTY;
+        }
+
+        public boolean isExhausted()
+        {
+            return true;
+        }
+
+        public int maxRemaining()
+        {
+            return 0;
+        }
+
+        public PagingState state()
+        {
+            return null;
+        }
+    };
+
     /**
      * Fetches the next page.
      *
      * @param pageSize the maximum number of elements to return in the next page.
+     * @param consistency the consistency level to achieve for the query.
+     * @param clientState the {@code ClientState} for the query. In practice, this can be null unless
+     * {@code consistency} is a serial consistency.
+     * @return the page of result.
+     */
+    public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException;
+
+    /**
+     * Starts a new read operation.
+     * <p>
+     * This must be called before {@link fetchPageInternal} and passed to it to protect the read.
+     * The returned object <b>must</b> be closed on all path and it is thus strongly advised to
+     * use it in a try-with-ressource construction.
+     *
+     * @return a newly started order group for this {@code QueryPager}.
+     */
+    public ReadOrderGroup startOrderGroup();
+
+    /**
+     * Fetches the next page internally (in other, this does a local query).
+     *
+     * @param pageSize the maximum number of elements to return in the next page.
+     * @param orderGroup the {@code ReadOrderGroup} protecting the read.
      * @return the page of result.
      */
-    public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException;
+    public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup) throws RequestValidationException, RequestExecutionException;
 
     /**
      * Whether or not this pager is exhausted, i.e. whether or not a call to

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/QueryPagers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
index f933ccb..618ca32 100644
--- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
@@ -17,180 +17,47 @@
  */
 package org.apache.cassandra.service.pager;
 
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ColumnCounter;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
-import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.service.ClientState;
 
 /**
- * Static utility methods to create query pagers.
+ * Static utility methods for paging.
  */
 public class QueryPagers
 {
     private QueryPagers() {};
 
-    private static int maxQueried(ReadCommand command)
-    {
-        if (command instanceof SliceByNamesReadCommand)
-        {
-            NamesQueryFilter filter = ((SliceByNamesReadCommand)command).filter;
-            return filter.countCQL3Rows() ? 1 : filter.columns.size();
-        }
-        else
-        {
-            SliceQueryFilter filter = ((SliceFromReadCommand)command).filter;
-            return filter.count;
-        }
-    }
-
-    public static boolean mayNeedPaging(Pageable command, int pageSize)
-    {
-        if (command instanceof Pageable.ReadCommands)
-        {
-            List<ReadCommand> commands = ((Pageable.ReadCommands)command).commands;
-
-            // Using long on purpose, as we could overflow otherwise
-            long maxQueried = 0;
-            for (ReadCommand readCmd : commands)
-                maxQueried += maxQueried(readCmd);
-
-            return maxQueried > pageSize;
-        }
-        else if (command instanceof ReadCommand)
-        {
-            return maxQueried((ReadCommand)command) > pageSize;
-        }
-        else
-        {
-            assert command instanceof RangeSliceCommand;
-            RangeSliceCommand rsc = (RangeSliceCommand)command;
-            // We don't support paging for thrift in general because the way thrift RangeSliceCommand count rows
-            // independently of cells makes things harder (see RangeSliceQueryPager). The one case where we do
-            // get a RangeSliceCommand from CQL3 without the countCQL3Rows flag set is for DISTINCT. In that case
-            // however, the underlying sliceQueryFilter count is 1, so that the RSC limit is still a limit on the
-            // number of CQL3 rows returned.
-            assert rsc.countCQL3Rows || (rsc.predicate instanceof SliceQueryFilter && ((SliceQueryFilter)rsc.predicate).count == 1);
-            return rsc.maxResults > pageSize;
-        }
-    }
-
-    private static QueryPager pager(ReadCommand command, ConsistencyLevel consistencyLevel, ClientState cState, boolean local, PagingState state)
-    {
-        if (command instanceof SliceByNamesReadCommand)
-            return new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, cState, local);
-        else
-            return new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, cState, local, state);
-    }
-
-    private static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel, ClientState cState, boolean local, PagingState state)
-    {
-        if (command instanceof Pageable.ReadCommands)
-        {
-            List<ReadCommand> commands = ((Pageable.ReadCommands)command).commands;
-            if (commands.size() == 1)
-                return pager(commands.get(0), consistencyLevel, cState, local, state);
-
-            return new MultiPartitionPager(commands, consistencyLevel, cState, local, state, ((Pageable.ReadCommands) command).limitForQuery);
-        }
-        else if (command instanceof ReadCommand)
-        {
-            return pager((ReadCommand)command, consistencyLevel, cState, local, state);
-        }
-        else
-        {
-            assert command instanceof RangeSliceCommand;
-            RangeSliceCommand rangeCommand = (RangeSliceCommand)command;
-            if (rangeCommand.predicate instanceof NamesQueryFilter)
-                return new RangeNamesQueryPager(rangeCommand, consistencyLevel, local, state);
-            else
-                return new RangeSliceQueryPager(rangeCommand, consistencyLevel, local, state);
-        }
-    }
-
-    public static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel, ClientState cState)
-    {
-        return pager(command, consistencyLevel, cState, false, null);
-    }
-
-    public static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel, ClientState cState, PagingState state)
-    {
-        return pager(command, consistencyLevel, cState, false, state);
-    }
-
-    public static QueryPager localPager(Pageable command)
-    {
-        return pager(command, null, null, true, null);
-    }
-
-    /**
-     * Convenience method to (locally) page an internal row.
-     * Used to 2ndary index a wide row without dying.
-     */
-    public static Iterator<ColumnFamily> pageRowLocally(final ColumnFamilyStore cfs, ByteBuffer key, final int pageSize)
-    {
-        SliceFromReadCommand command = new SliceFromReadCommand(cfs.metadata.ksName, key, cfs.name, System.currentTimeMillis(), new IdentityQueryFilter());
-        final SliceQueryPager pager = new SliceQueryPager(command, null, null, true);
-
-        return new Iterator<ColumnFamily>()
-        {
-            // We don't use AbstractIterator because we don't want hasNext() to do an actual query
-            public boolean hasNext()
-            {
-                return !pager.isExhausted();
-            }
-
-            public ColumnFamily next()
-            {
-                try
-                {
-                    List<Row> rows = pager.fetchPage(pageSize);
-                    ColumnFamily cf = rows.isEmpty() ? null : rows.get(0).cf;
-                    return cf == null ? ArrayBackedSortedColumns.factory.create(cfs.metadata) : cf;
-                }
-                catch (Exception e)
-                {
-                    throw new RuntimeException(e);
-                }
-            }
-
-            public void remove()
-            {
-                throw new UnsupportedOperationException();
-            }
-        };
-    }
-
     /**
      * Convenience method that count (live) cells/rows for a given slice of a row, but page underneath.
      */
-    public static int countPaged(String keyspace,
-                                 String columnFamily,
-                                 ByteBuffer key,
-                                 SliceQueryFilter filter,
+    public static int countPaged(CFMetaData metadata,
+                                 DecoratedKey key,
+                                 ColumnFilter columnFilter,
+                                 ClusteringIndexFilter filter,
+                                 DataLimits limits,
                                  ConsistencyLevel consistencyLevel,
-                                 ClientState cState,
+                                 ClientState state,
                                  final int pageSize,
-                                 long now) throws RequestValidationException, RequestExecutionException
+                                 int nowInSec,
+                                 boolean isForThrift) throws RequestValidationException, RequestExecutionException
     {
-        SliceFromReadCommand command = new SliceFromReadCommand(keyspace, key, columnFamily, now, filter);
-        final SliceQueryPager pager = new SliceQueryPager(command, consistencyLevel, cState, false);
+        SinglePartitionReadCommand command = SinglePartitionReadCommand.create(isForThrift, metadata, nowInSec, columnFilter, RowFilter.NONE, limits, key, filter);
+        final SinglePartitionPager pager = new SinglePartitionPager(command, null);
 
-        ColumnCounter counter = filter.columnCounter(Schema.instance.getCFMetaData(keyspace, columnFamily).comparator, now);
+        int count = 0;
         while (!pager.isExhausted())
         {
-            List<Row> next = pager.fetchPage(pageSize);
-            if (!next.isEmpty())
-                counter.countAll(next.get(0).cf);
+            try (CountingPartitionIterator iter = new CountingPartitionIterator(pager.fetchPage(pageSize, consistencyLevel, state), limits, nowInSec))
+            {
+                PartitionIterators.consume(iter);
+                count += iter.counter().counted();
+            }
         }
-        return counter.live();
+        return count;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
index 50d1280..fffb4e1 100644
--- a/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
@@ -17,13 +17,11 @@
  */
 package org.apache.cassandra.service.pager;
 
-import java.util.List;
-
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 
 /**
@@ -37,25 +35,17 @@ import org.apache.cassandra.service.StorageService;
  */
 public class RangeNamesQueryPager extends AbstractQueryPager
 {
-    private final RangeSliceCommand command;
     private volatile DecoratedKey lastReturnedKey;
 
-    // Don't use directly, use QueryPagers method instead
-    RangeNamesQueryPager(RangeSliceCommand command, ConsistencyLevel consistencyLevel, boolean localQuery)
+    public RangeNamesQueryPager(PartitionRangeReadCommand command, PagingState state)
     {
-        super(consistencyLevel, command.maxResults, localQuery, command.keyspace, command.columnFamily, command.predicate, command.timestamp);
-        this.command = command;
-        assert columnFilter instanceof NamesQueryFilter && ((NamesQueryFilter)columnFilter).countCQL3Rows();
-    }
-
-    RangeNamesQueryPager(RangeSliceCommand command, ConsistencyLevel consistencyLevel, boolean localQuery, PagingState state)
-    {
-        this(command, consistencyLevel, localQuery);
+        super(command);
+        assert command.isNamesQuery();
 
         if (state != null)
         {
             lastReturnedKey = StorageService.getPartitioner().decorateKey(state.partitionKey);
-            restoreState(state.remaining, true);
+            restoreState(lastReturnedKey, state.remaining, state.remainingInPartition);
         }
     }
 
@@ -63,51 +53,36 @@ public class RangeNamesQueryPager extends AbstractQueryPager
     {
         return lastReturnedKey == null
              ? null
-             : new PagingState(lastReturnedKey.getKey(), null, maxRemaining());
+             : new PagingState(lastReturnedKey.getKey(), null, maxRemaining(), remainingInPartition());
     }
 
-    protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery)
+    protected ReadCommand nextPageReadCommand(int pageSize)
     throws RequestExecutionException
     {
-        AbstractRangeCommand pageCmd = command.withUpdatedLimit(pageSize);
+        PartitionRangeReadCommand pageCmd = ((PartitionRangeReadCommand)command).withUpdatedLimit(command.limits().forPaging(pageSize));
         if (lastReturnedKey != null)
             pageCmd = pageCmd.forSubRange(makeExcludingKeyBounds(lastReturnedKey));
 
-        return localQuery
-             ? pageCmd.executeLocally()
-             : StorageProxy.getRangeSlice(pageCmd, consistencyLevel);
-    }
-
-    protected boolean containsPreviousLast(Row first)
-    {
-        // When querying the next page, we create a bound that exclude the lastReturnedKey
-        return false;
-    }
-
-    protected boolean recordLast(Row last)
-    {
-        lastReturnedKey = last.key;
-        // We return false as that means "can that last be in the next query?"
-        return false;
+        return pageCmd;
     }
 
-    protected boolean isReversed()
+    protected void recordLast(DecoratedKey key, Row last)
     {
-        return false;
+        lastReturnedKey = key;
     }
 
-    private AbstractBounds<RowPosition> makeExcludingKeyBounds(RowPosition lastReturnedKey)
+    private AbstractBounds<PartitionPosition> makeExcludingKeyBounds(PartitionPosition lastReturnedKey)
     {
         // We return a range that always exclude lastReturnedKey, since we've already
         // returned it.
-        AbstractBounds<RowPosition> bounds = command.keyRange;
+        AbstractBounds<PartitionPosition> bounds = ((PartitionRangeReadCommand)command).dataRange().keyRange();
         if (bounds instanceof Range || bounds instanceof Bounds)
         {
-            return new Range<RowPosition>(lastReturnedKey, bounds.right);
+            return new Range<PartitionPosition>(lastReturnedKey, bounds.right);
         }
         else
         {
-            return new ExcludingBounds<RowPosition>(lastReturnedKey, bounds.right);
+            return new ExcludingBounds<PartitionPosition>(lastReturnedKey, bounds.right);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
index c9a28e8..6429be0 100644
--- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@ -17,17 +17,16 @@
  */
 package org.apache.cassandra.service.pager;
 
-import java.util.List;
-
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Pages a RangeSliceCommand whose predicate is a slice query.
  *
@@ -36,27 +35,21 @@ import org.apache.cassandra.service.StorageService;
  */
 public class RangeSliceQueryPager extends AbstractQueryPager
 {
-    private final RangeSliceCommand command;
-    private volatile DecoratedKey lastReturnedKey;
-    private volatile CellName lastReturnedName;
+    private static final Logger logger = LoggerFactory.getLogger(RangeSliceQueryPager.class);
 
-    // Don't use directly, use QueryPagers method instead
-    RangeSliceQueryPager(RangeSliceCommand command, ConsistencyLevel consistencyLevel, boolean localQuery)
-    {
-        super(consistencyLevel, command.maxResults, localQuery, command.keyspace, command.columnFamily, command.predicate, command.timestamp);
-        this.command = command;
-        assert columnFilter instanceof SliceQueryFilter;
-    }
+    private volatile DecoratedKey lastReturnedKey;
+    private volatile Clustering lastReturnedClustering;
 
-    RangeSliceQueryPager(RangeSliceCommand command, ConsistencyLevel consistencyLevel, boolean localQuery, PagingState state)
+    public RangeSliceQueryPager(PartitionRangeReadCommand command, PagingState state)
     {
-        this(command, consistencyLevel, localQuery);
+        super(command);
+        assert !command.isNamesQuery();
 
         if (state != null)
         {
             lastReturnedKey = StorageService.getPartitioner().decorateKey(state.partitionKey);
-            lastReturnedName = cfm.comparator.cellFromByteBuffer(state.cellName);
-            restoreState(state.remaining, true);
+            lastReturnedClustering = LegacyLayout.decodeClustering(command.metadata(), state.cellName);
+            restoreState(lastReturnedKey, state.remaining, state.remainingInPartition);
         }
     }
 
@@ -64,67 +57,63 @@ public class RangeSliceQueryPager extends AbstractQueryPager
     {
         return lastReturnedKey == null
              ? null
-             : new PagingState(lastReturnedKey.getKey(), lastReturnedName.toByteBuffer(), maxRemaining());
+             : new PagingState(lastReturnedKey.getKey(), LegacyLayout.encodeClustering(command.metadata(), lastReturnedClustering), maxRemaining(), remainingInPartition());
     }
 
-    protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery)
+    protected ReadCommand nextPageReadCommand(int pageSize)
     throws RequestExecutionException
     {
-        SliceQueryFilter sf = (SliceQueryFilter)columnFilter;
-        AbstractBounds<RowPosition> keyRange = lastReturnedKey == null ? command.keyRange : makeIncludingKeyBounds(lastReturnedKey);
-        Composite start = lastReturnedName == null ? sf.start() : lastReturnedName;
-        PagedRangeCommand pageCmd = new PagedRangeCommand(command.keyspace,
-                                                          command.columnFamily,
-                                                          command.timestamp,
-                                                          keyRange,
-                                                          sf,
-                                                          start,
-                                                          sf.finish(),
-                                                          command.rowFilter,
-                                                          pageSize,
-                                                          command.countCQL3Rows);
-
-        return localQuery
-             ? pageCmd.executeLocally()
-             : StorageProxy.getRangeSlice(pageCmd, consistencyLevel);
-    }
-
-    protected boolean containsPreviousLast(Row first)
-    {
-        if (lastReturnedKey == null || !lastReturnedKey.equals(first.key))
-            return false;
-
-        // Same as SliceQueryPager, we ignore a deleted column
-        Cell firstCell = isReversed() ? lastCell(first.cf) : firstNonStaticCell(first.cf);
-        return !first.cf.deletionInfo().isDeleted(firstCell)
-            && firstCell.isLive(timestamp())
-            && lastReturnedName.equals(firstCell.name());
-    }
+        DataLimits limits;
+        DataRange fullRange = ((PartitionRangeReadCommand)command).dataRange();
+        DataRange pageRange;
+        if (lastReturnedKey == null)
+        {
+            pageRange = fullRange;
+            limits = command.limits().forPaging(pageSize);
+        }
+        else
+        {
+            // We want to include the last returned key only if we haven't achieved our per-partition limit, otherwise, don't bother.
+            boolean includeLastKey = remainingInPartition() > 0;
+            AbstractBounds<PartitionPosition> bounds = makeKeyBounds(lastReturnedKey, includeLastKey);
+            if (includeLastKey)
+            {
+                pageRange = fullRange.forPaging(bounds, command.metadata().comparator, lastReturnedClustering, false);
+                limits = command.limits().forPaging(pageSize, lastReturnedKey.getKey(), remainingInPartition());
+            }
+            else
+            {
+                pageRange = fullRange.forSubRange(bounds);
+                limits = command.limits().forPaging(pageSize);
+            }
+        }
 
-    protected boolean recordLast(Row last)
-    {
-        lastReturnedKey = last.key;
-        lastReturnedName = (isReversed() ? firstNonStaticCell(last.cf) : lastCell(last.cf)).name();
-        return true;
+        return new PartitionRangeReadCommand(command.metadata(), command.nowInSec(), command.columnFilter(), command.rowFilter(), limits, pageRange);
     }
 
-    protected boolean isReversed()
+    protected void recordLast(DecoratedKey key, Row last)
     {
-        return ((SliceQueryFilter)command.predicate).reversed;
+        if (last != null)
+        {
+            lastReturnedKey = key;
+            lastReturnedClustering = last.clustering().takeAlias();
+        }
     }
 
-    private AbstractBounds<RowPosition> makeIncludingKeyBounds(RowPosition lastReturnedKey)
+    private AbstractBounds<PartitionPosition> makeKeyBounds(PartitionPosition lastReturnedKey, boolean includeLastKey)
     {
-        // We always include lastReturnedKey since we may still be paging within a row,
-        // and PagedRangeCommand will move over if we're not anyway
-        AbstractBounds<RowPosition> bounds = command.keyRange;
+        AbstractBounds<PartitionPosition> bounds = ((PartitionRangeReadCommand)command).dataRange().keyRange();
         if (bounds instanceof Range || bounds instanceof Bounds)
         {
-            return new Bounds<RowPosition>(lastReturnedKey, bounds.right);
+            return includeLastKey
+                 ? new Bounds<PartitionPosition>(lastReturnedKey, bounds.right)
+                 : new Range<PartitionPosition>(lastReturnedKey, bounds.right);
         }
         else
         {
-            return new IncludingExcludingBounds<RowPosition>(lastReturnedKey, bounds.right);
+            return includeLastKey
+                 ? new IncludingExcludingBounds<PartitionPosition>(lastReturnedKey, bounds.right)
+                 : new ExcludingBounds<PartitionPosition>(lastReturnedKey, bounds.right);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
index 51bbf90..6488641 100644
--- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
@@ -19,15 +19,67 @@ package org.apache.cassandra.service.pager;
 
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.db.filter.ColumnCounter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.ClientState;
 
 /**
  * Common interface to single partition queries (by slice and by name).
  *
  * For use by MultiPartitionPager.
  */
-public interface SinglePartitionPager extends QueryPager
+public class SinglePartitionPager extends AbstractQueryPager
 {
-    public ByteBuffer key();
-    public ColumnCounter columnCounter();
+    private static final Logger logger = LoggerFactory.getLogger(SinglePartitionPager.class);
+
+    private final SinglePartitionReadCommand<?> command;
+
+    private volatile Clustering lastReturned;
+
+    public SinglePartitionPager(SinglePartitionReadCommand<?> command, PagingState state)
+    {
+        super(command);
+        this.command = command;
+
+        if (state != null)
+        {
+            lastReturned = LegacyLayout.decodeClustering(command.metadata(), state.cellName);
+            restoreState(command.partitionKey(), state.remaining, state.remainingInPartition);
+        }
+    }
+
+    public ByteBuffer key()
+    {
+        return command.partitionKey().getKey();
+    }
+
+    public DataLimits limits()
+    {
+        return command.limits();
+    }
+
+    public PagingState state()
+    {
+        return lastReturned == null
+             ? null
+             : new PagingState(null, LegacyLayout.encodeClustering(command.metadata(), lastReturned), maxRemaining(), remainingInPartition());
+    }
+
+    protected ReadCommand nextPageReadCommand(int pageSize)
+    {
+        return command.forPaging(lastReturned, pageSize);
+    }
+
+    protected void recordLast(DecoratedKey key, Row last)
+    {
+        if (last != null)
+            lastReturned = last.clustering().takeAlias();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
deleted file mode 100644
index bc364aa..0000000
--- a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service.pager;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
-import org.apache.cassandra.exceptions.RequestValidationException;
-import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.StorageProxy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Pager over a SliceFromReadCommand.
- */
-public class SliceQueryPager extends AbstractQueryPager implements SinglePartitionPager
-{
-    private static final Logger logger = LoggerFactory.getLogger(SliceQueryPager.class);
-
-    private final SliceFromReadCommand command;
-    private final ClientState cstate;
-
-    private volatile Composite lastReturned;
-
-    // Don't use directly, use QueryPagers method instead
-    SliceQueryPager(SliceFromReadCommand command, ConsistencyLevel consistencyLevel, ClientState cstate, boolean localQuery)
-    {
-        super(consistencyLevel, command.filter.count, localQuery, command.ksName, command.cfName, command.filter, command.timestamp);
-        this.command = command;
-        this.cstate = cstate;
-    }
-
-    SliceQueryPager(SliceFromReadCommand command, ConsistencyLevel consistencyLevel, ClientState cstate, boolean localQuery, PagingState state)
-    {
-        this(command, consistencyLevel, cstate, localQuery);
-
-        if (state != null)
-        {
-            lastReturned = cfm.comparator.fromByteBuffer(state.cellName);
-            restoreState(state.remaining, true);
-        }
-    }
-
-    public ByteBuffer key()
-    {
-        return command.key;
-    }
-
-    public PagingState state()
-    {
-        return lastReturned == null
-             ? null
-             : new PagingState(null, lastReturned.toByteBuffer(), maxRemaining());
-    }
-
-    protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery)
-    throws RequestValidationException, RequestExecutionException
-    {
-        // For some queries, such as a DISTINCT query on static columns, the limit for slice queries will be lower
-        // than the page size (in the static example, it will be 1).  We use the min here to ensure we don't fetch
-        // more rows than we're supposed to.  See CASSANDRA-8108 for more details.
-        SliceQueryFilter filter = command.filter.withUpdatedCount(Math.min(command.filter.count, pageSize));
-        if (lastReturned != null)
-            filter = filter.withUpdatedStart(lastReturned, cfm);
-
-        logger.debug("Querying next page of slice query; new filter: {}", filter);
-        ReadCommand pageCmd = command.withUpdatedFilter(filter);
-        return localQuery
-             ? Collections.singletonList(pageCmd.getRow(Keyspace.open(command.ksName)))
-             : StorageProxy.read(Collections.singletonList(pageCmd), consistencyLevel, cstate);
-    }
-
-    protected boolean containsPreviousLast(Row first)
-    {
-        if (lastReturned == null)
-            return false;
-
-        Cell firstCell = isReversed() ? lastCell(first.cf) : firstNonStaticCell(first.cf);
-        // Note: we only return true if the column is the lastReturned *and* it is live. If it is deleted, it is ignored by the
-        // rest of the paging code (it hasn't been counted as live in particular) and we want to act as if it wasn't there.
-        return !first.cf.deletionInfo().isDeleted(firstCell)
-            && firstCell.isLive(timestamp())
-            && lastReturned.equals(firstCell.name());
-    }
-
-    protected boolean recordLast(Row last)
-    {
-        Cell lastCell = isReversed() ? firstNonStaticCell(last.cf) : lastCell(last.cf);
-        lastReturned = lastCell.name();
-        return true;
-    }
-
-    protected boolean isReversed()
-    {
-        return command.filter.reversed;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/paxos/Commit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java b/src/java/org/apache/cassandra/service/paxos/Commit.java
index 45d04f9..6077166 100644
--- a/src/java/org/apache/cassandra/service/paxos/Commit.java
+++ b/src/java/org/apache/cassandra/service/paxos/Commit.java
@@ -24,14 +24,18 @@ package org.apache.cassandra.service.paxos;
 import java.io.DataInput;
 import java.io.IOException;
 import java.util.UUID;
-import java.nio.ByteBuffer;
 
 import com.google.common.base.Objects;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.UUIDSerializer;
@@ -40,34 +44,31 @@ public class Commit
 {
     public static final CommitSerializer serializer = new CommitSerializer();
 
-    public final ByteBuffer key;
     public final UUID ballot;
-    public final ColumnFamily update;
+    public final PartitionUpdate update;
 
-    public Commit(ByteBuffer key, UUID ballot, ColumnFamily update)
+    public Commit(UUID ballot, PartitionUpdate update)
     {
-        assert key != null;
         assert ballot != null;
         assert update != null;
 
-        this.key = key;
         this.ballot = ballot;
         this.update = update;
     }
 
-    public static Commit newPrepare(ByteBuffer key, CFMetaData metadata, UUID ballot)
+    public static Commit newPrepare(DecoratedKey key, CFMetaData metadata, UUID ballot)
     {
-        return new Commit(key, ballot, ArrayBackedSortedColumns.factory.create(metadata));
+        return new Commit(ballot, PartitionUpdate.emptyUpdate(metadata, key));
     }
 
-    public static Commit newProposal(ByteBuffer key, UUID ballot, ColumnFamily update)
+    public static Commit newProposal(UUID ballot, PartitionUpdate update)
     {
-        return new Commit(key, ballot, updatesWithPaxosTime(update, ballot));
+        return new Commit(ballot, updatesWithPaxosTime(update, ballot));
     }
 
-    public static Commit emptyCommit(ByteBuffer key, CFMetaData metadata)
+    public static Commit emptyCommit(DecoratedKey key, CFMetaData metadata)
     {
-        return new Commit(key, UUIDGen.minTimeUUID(0), ArrayBackedSortedColumns.factory.create(metadata));
+        return new Commit(UUIDGen.minTimeUUID(0), PartitionUpdate.emptyUpdate(metadata, key));
     }
 
     public boolean isAfter(Commit other)
@@ -83,7 +84,7 @@ public class Commit
     public Mutation makeMutation()
     {
         assert update != null;
-        return new Mutation(key, update);
+        return new Mutation(update);
     }
 
     @Override
@@ -95,7 +96,6 @@ public class Commit
         Commit commit = (Commit) o;
 
         if (!ballot.equals(commit.ballot)) return false;
-        if (!key.equals(commit.key)) return false;
         if (!update.equals(commit.update)) return false;
 
         return true;
@@ -104,52 +104,88 @@ public class Commit
     @Override
     public int hashCode()
     {
-        return Objects.hashCode(key, ballot, update);
+        return Objects.hashCode(ballot, update);
     }
 
-    private static ColumnFamily updatesWithPaxosTime(ColumnFamily updates, UUID ballot)
+    private static PartitionUpdate updatesWithPaxosTime(PartitionUpdate update, UUID ballot)
     {
-        ColumnFamily cf = updates.cloneMeShallow();
         long t = UUIDGen.microsTimestamp(ballot);
-        // For the tombstones, we use t-1 so that when insert a collection literall, the range tombstone that deletes the previous values of
-        // the collection and we want that to have a lower timestamp and our new values. Since tombstones wins over normal insert, using t-1
-        // should not be a problem in general (see #6069).
-        cf.deletionInfo().updateAllTimestamp(t-1);
-        for (Cell cell : updates)
-            cf.addAtom(cell.withUpdatedTimestamp(t));
-        return cf;
+        // Using t-1 for tombstones so deletion doesn't trump newly inserted data (#6069)
+        PartitionUpdate newUpdate = new PartitionUpdate(update.metadata(),
+                                                        update.partitionKey(),
+                                                        update.deletionInfo().updateAllTimestamp(t-1),
+                                                        update.columns(),
+                                                        update.rowCount());
+
+        if (!update.staticRow().isEmpty())
+            copyWithUpdatedTimestamp(update.staticRow(), newUpdate.staticWriter(), t);
+
+        for (Row row : update)
+            copyWithUpdatedTimestamp(row, newUpdate.writer(), t);
+
+        return newUpdate;
+    }
+
+    private static void copyWithUpdatedTimestamp(Row row, Row.Writer writer, long timestamp)
+    {
+        Rows.writeClustering(row.clustering(), writer);
+        writer.writePartitionKeyLivenessInfo(row.primaryKeyLivenessInfo().withUpdatedTimestamp(timestamp));
+        writer.writeRowDeletion(row.deletion());
+
+        for (Cell cell : row)
+            writer.writeCell(cell.column(), cell.isCounterCell(), cell.value(), cell.livenessInfo().withUpdatedTimestamp(timestamp), cell.path());
+
+        for (int i = 0; i < row.columns().complexColumnCount(); i++)
+        {
+            ColumnDefinition c = row.columns().getComplex(i);
+            DeletionTime dt = row.getDeletion(c);
+            // We use t-1 to make sure that on inserting a collection literal, the deletion that comes with it does not
+            // end up deleting the inserted data (see #6069)
+            if (!dt.isLive())
+                writer.writeComplexDeletion(c, new SimpleDeletionTime(timestamp-1, dt.localDeletionTime()));
+        }
+        writer.endOfRow();
     }
 
     @Override
     public String toString()
     {
-        return String.format("Commit(%s, %s, %s)", ByteBufferUtil.bytesToHex(key), ballot, update);
+        return String.format("Commit(%s, %s)", ballot, update);
     }
 
     public static class CommitSerializer implements IVersionedSerializer<Commit>
     {
         public void serialize(Commit commit, DataOutputPlus out, int version) throws IOException
         {
-            ByteBufferUtil.writeWithShortLength(commit.key, out);
+            if (version < MessagingService.VERSION_30)
+                ByteBufferUtil.writeWithShortLength(commit.update.partitionKey().getKey(), out);
+
             UUIDSerializer.serializer.serialize(commit.ballot, out, version);
-            ColumnFamily.serializer.serialize(commit.update, out, version);
+            PartitionUpdate.serializer.serialize(commit.update, out, version);
         }
 
         public Commit deserialize(DataInput in, int version) throws IOException
         {
-            return new Commit(ByteBufferUtil.readWithShortLength(in),
-                              UUIDSerializer.serializer.deserialize(in, version),
-                              ColumnFamily.serializer.deserialize(in,
-                                                                  ArrayBackedSortedColumns.factory,
-                                                                  ColumnSerializer.Flag.LOCAL,
-                                                                  version));
+            DecoratedKey key = null;
+            if (version < MessagingService.VERSION_30)
+                key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
+
+            UUID ballot = UUIDSerializer.serializer.deserialize(in, version);
+            PartitionUpdate update = PartitionUpdate.serializer.deserialize(in, version, SerializationHelper.Flag.LOCAL, key);
+            return new Commit(ballot, update);
         }
 
         public long serializedSize(Commit commit, int version)
         {
-            return 2 + commit.key.remaining()
-                   + UUIDSerializer.serializer.serializedSize(commit.ballot, version)
-                   + ColumnFamily.serializer.serializedSize(commit.update, version);
+            TypeSizes sizes = TypeSizes.NATIVE;
+
+            int size = 0;
+            if (version < MessagingService.VERSION_30)
+                size += ByteBufferUtil.serializedSizeWithShortLength(commit.update.partitionKey().getKey(), sizes);
+
+            return size
+                 + UUIDSerializer.serializer.serializedSize(commit.ballot, version)
+                 + PartitionUpdate.serializer.serializedSize(commit.update, version, sizes);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index 01e03f4..20ccb90 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -39,14 +39,14 @@ public class PaxosState
     private final Commit accepted;
     private final Commit mostRecentCommit;
 
-    public PaxosState(ByteBuffer key, CFMetaData metadata)
+    public PaxosState(DecoratedKey key, CFMetaData metadata)
     {
         this(Commit.emptyCommit(key, metadata), Commit.emptyCommit(key, metadata), Commit.emptyCommit(key, metadata));
     }
 
     public PaxosState(Commit promised, Commit accepted, Commit mostRecentCommit)
     {
-        assert promised.key == accepted.key && accepted.key == mostRecentCommit.key;
+        assert promised.update.partitionKey().equals(accepted.update.partitionKey()) && accepted.update.partitionKey().equals(mostRecentCommit.update.partitionKey());
         assert promised.update.metadata() == accepted.update.metadata() && accepted.update.metadata() == mostRecentCommit.update.metadata();
 
         this.promised = promised;
@@ -59,11 +59,11 @@ public class PaxosState
         long start = System.nanoTime();
         try
         {
-            Lock lock = LOCKS.get(toPrepare.key);
+            Lock lock = LOCKS.get(toPrepare.update.partitionKey());
             lock.lock();
             try
             {
-                PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata());
+                PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.update.partitionKey(), toPrepare.update.metadata());
                 if (toPrepare.isAfter(state.promised))
                 {
                     Tracing.trace("Promising ballot {}", toPrepare.ballot);
@@ -94,11 +94,11 @@ public class PaxosState
         long start = System.nanoTime();
         try
         {
-            Lock lock = LOCKS.get(proposal.key);
+            Lock lock = LOCKS.get(proposal.update.partitionKey());
             lock.lock();
             try
             {
-                PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata());
+                PaxosState state = SystemKeyspace.loadPaxosState(proposal.update.partitionKey(), proposal.update.metadata());
                 if (proposal.hasBallot(state.promised.ballot) || proposal.isAfter(state.promised))
                 {
                     Tracing.trace("Accepting proposal {}", proposal);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index a446b0b..7b5edf2 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +47,7 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
 
     private final Map<InetAddress, Commit> commitsByReplica = new ConcurrentHashMap<InetAddress, Commit>();
 
-    public PrepareCallback(ByteBuffer key, CFMetaData metadata, int targets, ConsistencyLevel consistency)
+    public PrepareCallback(DecoratedKey key, CFMetaData metadata, int targets, ConsistencyLevel consistency)
     {
         super(targets, consistency);
         // need to inject the right key in the empty commit so comparing with empty commits in the reply works as expected