You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:47 UTC

[23/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
new file mode 100644
index 0000000..4edd707
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -0,0 +1,819 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.BTreeSearchIterator;
+import org.apache.cassandra.utils.btree.UpdateFunction;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.SearchIterator;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.concurrent.Locks;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
+import org.apache.cassandra.utils.memory.HeapAllocator;
+import org.apache.cassandra.service.StorageService;
+import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
+
+/**
+ * A thread-safe and atomic Partition implementation.
+ *
+ * Operations (in particular addAll) on this implementation are atomic and
+ * isolated (in the sense of ACID). Typically a addAll is guaranteed that no
+ * other thread can see the state where only parts but not all rows have
+ * been added.
+ */
+public class AtomicBTreePartition implements Partition
+{
+    private static final Logger logger = LoggerFactory.getLogger(AtomicBTreePartition.class);
+
+    public static final long EMPTY_SIZE = ObjectSizes.measure(new AtomicBTreePartition(CFMetaData.createFake("keyspace", "table"),
+                                                                                       StorageService.getPartitioner().decorateKey(ByteBuffer.allocate(1)),
+                                                                                       null));
+
+    // Reserved values for wasteTracker field. These values must not be consecutive (see avoidReservedValues)
+    private static final int TRACKER_NEVER_WASTED = 0;
+    private static final int TRACKER_PESSIMISTIC_LOCKING = Integer.MAX_VALUE;
+
+    // The granularity with which we track wasted allocation/work; we round up
+    private static final int ALLOCATION_GRANULARITY_BYTES = 1024;
+    // The number of bytes we have to waste in excess of our acceptable realtime rate of waste (defined below)
+    private static final long EXCESS_WASTE_BYTES = 10 * 1024 * 1024L;
+    private static final int EXCESS_WASTE_OFFSET = (int) (EXCESS_WASTE_BYTES / ALLOCATION_GRANULARITY_BYTES);
+    // Note this is a shift, because dividing a long time and then picking the low 32 bits doesn't give correct rollover behavior
+    private static final int CLOCK_SHIFT = 17;
+    // CLOCK_GRANULARITY = 1^9ns >> CLOCK_SHIFT == 132us == (1/7.63)ms
+
+    /**
+     * (clock + allocation) granularity are combined to give us an acceptable (waste) allocation rate that is defined by
+     * the passage of real time of ALLOCATION_GRANULARITY_BYTES/CLOCK_GRANULARITY, or in this case 7.63Kb/ms, or 7.45Mb/s
+     *
+     * in wasteTracker we maintain within EXCESS_WASTE_OFFSET before the current time; whenever we waste bytes
+     * we increment the current value if it is within this window, and set it to the min of the window plus our waste
+     * otherwise.
+     */
+    private volatile int wasteTracker = TRACKER_NEVER_WASTED;
+
+    private static final AtomicIntegerFieldUpdater<AtomicBTreePartition> wasteTrackerUpdater = AtomicIntegerFieldUpdater.newUpdater(AtomicBTreePartition.class, "wasteTracker");
+
+    private static final DeletionInfo LIVE = DeletionInfo.live();
+    // This is a small optimization: DeletionInfo is mutable, but we know that we will always copy it in that class,
+    // so we can safely alias one DeletionInfo.live() reference and avoid some allocations.
+    private static final Holder EMPTY = new Holder(BTree.empty(), LIVE, null, RowStats.NO_STATS);
+
+    private final CFMetaData metadata;
+    private final DecoratedKey partitionKey;
+    private final MemtableAllocator allocator;
+
+    private volatile Holder ref;
+
+    private static final AtomicReferenceFieldUpdater<AtomicBTreePartition, Holder> refUpdater = AtomicReferenceFieldUpdater.newUpdater(AtomicBTreePartition.class, Holder.class, "ref");
+
+    public AtomicBTreePartition(CFMetaData metadata, DecoratedKey partitionKey, MemtableAllocator allocator)
+    {
+        this.metadata = metadata;
+        this.partitionKey = partitionKey;
+        this.allocator = allocator;
+        this.ref = EMPTY;
+    }
+
+    public boolean isEmpty()
+    {
+        return ref.deletionInfo.isLive() && BTree.isEmpty(ref.tree) && ref.staticRow == null;
+    }
+
+    public CFMetaData metadata()
+    {
+        return metadata;
+    }
+
+    public DecoratedKey partitionKey()
+    {
+        return partitionKey;
+    }
+
+    public DeletionTime partitionLevelDeletion()
+    {
+        return ref.deletionInfo.getPartitionDeletion();
+    }
+
+    public PartitionColumns columns()
+    {
+        // We don't really know which columns will be part of the update, so assume it's all of them
+        return metadata.partitionColumns();
+    }
+
+    public boolean hasRows()
+    {
+        return !BTree.isEmpty(ref.tree);
+    }
+
+    public RowStats stats()
+    {
+        return ref.stats;
+    }
+
+    public Row getRow(Clustering clustering)
+    {
+        Row row = searchIterator(ColumnFilter.selection(columns()), false).next(clustering);
+        // Note that for statics, this will never return null, this will return an empty row. However,
+        // it's more consistent for this method to return null if we don't really have a static row.
+        return row == null || (clustering == Clustering.STATIC_CLUSTERING && row.isEmpty()) ? null : row;
+    }
+
+    public SearchIterator<Clustering, Row> searchIterator(final ColumnFilter columns, final boolean reversed)
+    {
+        // TODO: we could optimize comparison for "NativeRow" à la #6755
+        final Holder current = ref;
+        return new SearchIterator<Clustering, Row>()
+        {
+            private final SearchIterator<Clustering, MemtableRowData> rawIter = new BTreeSearchIterator<>(current.tree, metadata.comparator, !reversed);
+            private final MemtableRowData.ReusableRow row = allocator.newReusableRow();
+            private final ReusableFilteringRow filter = new ReusableFilteringRow(columns.fetchedColumns().regulars, columns);
+            private final long partitionDeletion = current.deletionInfo.getPartitionDeletion().markedForDeleteAt();
+
+            public boolean hasNext()
+            {
+                return rawIter.hasNext();
+            }
+
+            public Row next(Clustering key)
+            {
+                if (key == Clustering.STATIC_CLUSTERING)
+                    return makeStatic(columns, current, allocator);
+
+                MemtableRowData data = rawIter.next(key);
+                // We also need to find if there is a range tombstone covering this key
+                RangeTombstone rt = current.deletionInfo.rangeCovering(key);
+
+                if (data == null)
+                {
+                    // If we have a range tombstone but not data, "fake" the RT by return a row deletion
+                    // corresponding to the tombstone.
+                    if (rt != null && rt.deletionTime().markedForDeleteAt() > partitionDeletion)
+                        return filter.setRowDeletion(rt.deletionTime()).setTo(emptyDeletedRow(key, rt.deletionTime()));
+                    return null;
+                }
+
+                row.setTo(data);
+
+                filter.setRowDeletion(null);
+                if (rt == null || rt.deletionTime().markedForDeleteAt() < partitionDeletion)
+                {
+                    filter.setDeletionTimestamp(partitionDeletion);
+                }
+                else
+                {
+                    filter.setDeletionTimestamp(rt.deletionTime().markedForDeleteAt());
+                    // If we have a range tombstone covering that row and it's bigger than the row deletion itself, then
+                    // we replace the row deletion by the tombstone deletion as a way to return the tombstone.
+                    if (rt.deletionTime().supersedes(row.deletion()))
+                        filter.setRowDeletion(rt.deletionTime());
+                }
+
+                return filter.setTo(row);
+            }
+        };
+    }
+
+    private static Row emptyDeletedRow(Clustering clustering, DeletionTime deletion)
+    {
+        return new AbstractRow()
+        {
+            public Columns columns()
+            {
+                return Columns.NONE;
+            }
+
+            public LivenessInfo primaryKeyLivenessInfo()
+            {
+                return LivenessInfo.NONE;
+            }
+
+            public DeletionTime deletion()
+            {
+                return deletion;
+            }
+
+            public boolean isEmpty()
+            {
+                return true;
+            }
+
+            public boolean hasComplexDeletion()
+            {
+                return false;
+            }
+
+            public Clustering clustering()
+            {
+                return clustering;
+            }
+
+            public Cell getCell(ColumnDefinition c)
+            {
+                return null;
+            }
+
+            public Cell getCell(ColumnDefinition c, CellPath path)
+            {
+                return null;
+            }
+
+            public Iterator<Cell> getCells(ColumnDefinition c)
+            {
+                return null;
+            }
+
+            public DeletionTime getDeletion(ColumnDefinition c)
+            {
+                return DeletionTime.LIVE;
+            }
+
+            public Iterator<Cell> iterator()
+            {
+                return Iterators.<Cell>emptyIterator();
+            }
+
+            public SearchIterator<ColumnDefinition, ColumnData> searchIterator()
+            {
+                return new SearchIterator<ColumnDefinition, ColumnData>()
+                {
+                    public boolean hasNext()
+                    {
+                        return false;
+                    }
+
+                    public ColumnData next(ColumnDefinition column)
+                    {
+                        return null;
+                    }
+                };
+            }
+
+            public Row takeAlias()
+            {
+                return this;
+            }
+        };
+    }
+
+    public UnfilteredRowIterator unfilteredIterator()
+    {
+        return unfilteredIterator(ColumnFilter.selection(columns()), Slices.ALL, false);
+    }
+
+    public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed)
+    {
+        if (slices.size() == 0)
+        {
+            Holder current = ref;
+            DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion();
+            if (selection.fetchedColumns().statics.isEmpty() && partitionDeletion.isLive())
+                return UnfilteredRowIterators.emptyIterator(metadata, partitionKey, reversed);
+
+            return new AbstractUnfilteredRowIterator(metadata,
+                                                     partitionKey,
+                                                     partitionDeletion,
+                                                     selection.fetchedColumns(),
+                                                     makeStatic(selection, current, allocator),
+                                                     reversed,
+                                                     current.stats)
+            {
+                protected Unfiltered computeNext()
+                {
+                    return endOfData();
+                }
+            };
+        }
+
+        return slices.size() == 1
+             ? new SingleSliceIterator(metadata, partitionKey, ref, selection, slices.get(0), reversed, allocator)
+             : new SlicesIterator(metadata, partitionKey, ref, selection, slices, reversed, allocator);
+    }
+
+    private static Row makeStatic(ColumnFilter selection, Holder holder, MemtableAllocator allocator)
+    {
+        Columns statics = selection.fetchedColumns().statics;
+        if (statics.isEmpty() || holder.staticRow == null)
+            return Rows.EMPTY_STATIC_ROW;
+
+        return new ReusableFilteringRow(statics, selection)
+               .setDeletionTimestamp(holder.deletionInfo.getPartitionDeletion().markedForDeleteAt())
+               .setTo(allocator.newReusableRow().setTo(holder.staticRow));
+    }
+
+    private static class ReusableFilteringRow extends FilteringRow
+    {
+        private final Columns columns;
+        private final ColumnFilter selection;
+        private ColumnFilter.Tester tester;
+        private long deletionTimestamp;
+
+        // Used by searchIterator in case the row is covered by a tombstone.
+        private DeletionTime rowDeletion;
+
+        public ReusableFilteringRow(Columns columns, ColumnFilter selection)
+        {
+            this.columns = columns;
+            this.selection = selection;
+        }
+
+        public ReusableFilteringRow setDeletionTimestamp(long timestamp)
+        {
+            this.deletionTimestamp = timestamp;
+            return this;
+        }
+
+        public ReusableFilteringRow setRowDeletion(DeletionTime rowDeletion)
+        {
+            this.rowDeletion = rowDeletion;
+            return this;
+        }
+
+        @Override
+        public DeletionTime deletion()
+        {
+            return rowDeletion == null ? super.deletion() : rowDeletion;
+        }
+
+        @Override
+        protected boolean include(LivenessInfo info)
+        {
+            return info.timestamp() > deletionTimestamp;
+        }
+
+        @Override
+        protected boolean include(ColumnDefinition def)
+        {
+            return columns.contains(def);
+        }
+
+        @Override
+        protected boolean include(DeletionTime dt)
+        {
+            return dt.markedForDeleteAt() > deletionTimestamp;
+        }
+
+        @Override
+        protected boolean include(ColumnDefinition c, DeletionTime dt)
+        {
+            return dt.markedForDeleteAt() > deletionTimestamp;
+        }
+
+        @Override
+        protected boolean include(Cell cell)
+        {
+            return selection.includes(cell);
+        }
+    }
+
+    private static class SingleSliceIterator extends AbstractUnfilteredRowIterator
+    {
+        private final Iterator<Unfiltered> iterator;
+        private final ReusableFilteringRow row;
+
+        private SingleSliceIterator(CFMetaData metadata,
+                                    DecoratedKey key,
+                                    Holder holder,
+                                    ColumnFilter selection,
+                                    Slice slice,
+                                    boolean isReversed,
+                                    MemtableAllocator allocator)
+        {
+            super(metadata,
+                  key,
+                  holder.deletionInfo.getPartitionDeletion(),
+                  selection.fetchedColumns(),
+                  makeStatic(selection, holder, allocator),
+                  isReversed,
+                  holder.stats);
+
+            Iterator<Row> rowIter = rowIter(metadata,
+                                            holder,
+                                            slice,
+                                            !isReversed,
+                                            allocator);
+
+            this.iterator = new RowAndTombstoneMergeIterator(metadata.comparator, isReversed)
+                            .setTo(rowIter, holder.deletionInfo.rangeIterator(slice, isReversed));
+
+            this.row = new ReusableFilteringRow(selection.fetchedColumns().regulars, selection)
+                       .setDeletionTimestamp(partitionLevelDeletion.markedForDeleteAt());
+        }
+
+        private Iterator<Row> rowIter(CFMetaData metadata,
+                                      Holder holder,
+                                      Slice slice,
+                                      boolean forwards,
+                                      final MemtableAllocator allocator)
+        {
+            Slice.Bound start = slice.start() == Slice.Bound.BOTTOM ? null : slice.start();
+            Slice.Bound end = slice.end() == Slice.Bound.TOP ? null : slice.end();
+            final Iterator<MemtableRowData> dataIter = BTree.slice(holder.tree, metadata.comparator, start, true, end, true, forwards);
+            return new AbstractIterator<Row>()
+            {
+                private final MemtableRowData.ReusableRow row = allocator.newReusableRow();
+
+                protected Row computeNext()
+                {
+                    return dataIter.hasNext() ? row.setTo(dataIter.next()) : endOfData();
+                }
+            };
+        }
+
+        protected Unfiltered computeNext()
+        {
+            while (iterator.hasNext())
+            {
+                Unfiltered next = iterator.next();
+                if (next.kind() == Unfiltered.Kind.ROW)
+                {
+                    row.setTo((Row)next);
+                    if (!row.isEmpty())
+                        return row;
+                }
+                else
+                {
+                    RangeTombstoneMarker marker = (RangeTombstoneMarker)next;
+
+                    long deletion = partitionLevelDeletion().markedForDeleteAt();
+                    if (marker.isOpen(isReverseOrder()))
+                        deletion = Math.max(deletion, marker.openDeletionTime(isReverseOrder()).markedForDeleteAt());
+                    row.setDeletionTimestamp(deletion);
+                    return marker;
+                }
+            }
+            return endOfData();
+        }
+    }
+
+    public static class SlicesIterator extends AbstractUnfilteredRowIterator
+    {
+        private final Holder holder;
+        private final MemtableAllocator allocator;
+        private final ColumnFilter selection;
+        private final Slices slices;
+
+        private int idx;
+        private UnfilteredRowIterator currentSlice;
+
+        private SlicesIterator(CFMetaData metadata,
+                               DecoratedKey key,
+                               Holder holder,
+                               ColumnFilter selection,
+                               Slices slices,
+                               boolean isReversed,
+                               MemtableAllocator allocator)
+        {
+            super(metadata, key, holder.deletionInfo.getPartitionDeletion(), selection.fetchedColumns(), makeStatic(selection, holder, allocator), isReversed, holder.stats);
+            this.holder = holder;
+            this.selection = selection;
+            this.allocator = allocator;
+            this.slices = slices;
+        }
+
+        protected Unfiltered computeNext()
+        {
+            while (true)
+            {
+                if (currentSlice == null)
+                {
+                    if (idx >= slices.size())
+                        return endOfData();
+
+                    int sliceIdx = isReverseOrder ? slices.size() - idx - 1 : idx;
+                    currentSlice = new SingleSliceIterator(metadata,
+                                                           partitionKey,
+                                                           holder,
+                                                           selection,
+                                                           slices.get(sliceIdx),
+                                                           isReverseOrder,
+                                                           allocator);
+                    idx++;
+                }
+
+                if (currentSlice.hasNext())
+                    return currentSlice.next();
+
+                currentSlice = null;
+            }
+        }
+    }
+
+    /**
+     * Adds a given update to this in-memtable partition.
+     *
+     * @return an array containing first the difference in size seen after merging the updates, and second the minimum
+     * time detla between updates.
+     */
+    public long[] addAllWithSizeDelta(final PartitionUpdate update, OpOrder.Group writeOp, Updater indexer)
+    {
+        RowUpdater updater = new RowUpdater(this, allocator, writeOp, indexer);
+        DeletionInfo inputDeletionInfoCopy = null;
+
+        boolean monitorOwned = false;
+        try
+        {
+            if (usePessimisticLocking())
+            {
+                Locks.monitorEnterUnsafe(this);
+                monitorOwned = true;
+            }
+            while (true)
+            {
+                Holder current = ref;
+                updater.ref = current;
+                updater.reset();
+
+                DeletionInfo deletionInfo;
+                if (update.deletionInfo().mayModify(current.deletionInfo))
+                {
+                    if (inputDeletionInfoCopy == null)
+                        inputDeletionInfoCopy = update.deletionInfo().copy(HeapAllocator.instance);
+
+                    deletionInfo = current.deletionInfo.copy().add(inputDeletionInfoCopy);
+                    updater.allocated(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize());
+                }
+                else
+                {
+                    deletionInfo = current.deletionInfo;
+                }
+
+                Row newStatic = update.staticRow();
+                MemtableRowData staticRow = newStatic == Rows.EMPTY_STATIC_ROW
+                                          ? current.staticRow
+                                          : (current.staticRow == null ? updater.apply(newStatic) : updater.apply(current.staticRow, newStatic));
+                Object[] tree = BTree.<Clusterable, Row, MemtableRowData>update(current.tree, update.metadata().comparator, update, update.rowCount(), updater);
+                RowStats newStats = current.stats.mergeWith(update.stats());
+
+                if (tree != null && refUpdater.compareAndSet(this, current, new Holder(tree, deletionInfo, staticRow, newStats)))
+                {
+                    indexer.updateRowLevelIndexes();
+                    updater.finish();
+                    return new long[]{ updater.dataSize, updater.colUpdateTimeDelta };
+                }
+                else if (!monitorOwned)
+                {
+                    boolean shouldLock = usePessimisticLocking();
+                    if (!shouldLock)
+                    {
+                        shouldLock = updateWastedAllocationTracker(updater.heapSize);
+                    }
+                    if (shouldLock)
+                    {
+                        Locks.monitorEnterUnsafe(this);
+                        monitorOwned = true;
+                    }
+                }
+            }
+        }
+        finally
+        {
+            if (monitorOwned)
+                Locks.monitorExitUnsafe(this);
+        }
+
+    }
+
+    public boolean usePessimisticLocking()
+    {
+        return wasteTracker == TRACKER_PESSIMISTIC_LOCKING;
+    }
+
+    /**
+     * Update the wasted allocation tracker state based on newly wasted allocation information
+     *
+     * @param wastedBytes the number of bytes wasted by this thread
+     * @return true if the caller should now proceed with pessimistic locking because the waste limit has been reached
+     */
+    private boolean updateWastedAllocationTracker(long wastedBytes)
+    {
+        // Early check for huge allocation that exceeds the limit
+        if (wastedBytes < EXCESS_WASTE_BYTES)
+        {
+            // We round up to ensure work < granularity are still accounted for
+            int wastedAllocation = ((int) (wastedBytes + ALLOCATION_GRANULARITY_BYTES - 1)) / ALLOCATION_GRANULARITY_BYTES;
+
+            int oldTrackerValue;
+            while (TRACKER_PESSIMISTIC_LOCKING != (oldTrackerValue = wasteTracker))
+            {
+                // Note this time value has an arbitrary offset, but is a constant rate 32 bit counter (that may wrap)
+                int time = (int) (System.nanoTime() >>> CLOCK_SHIFT);
+                int delta = oldTrackerValue - time;
+                if (oldTrackerValue == TRACKER_NEVER_WASTED || delta >= 0 || delta < -EXCESS_WASTE_OFFSET)
+                    delta = -EXCESS_WASTE_OFFSET;
+                delta += wastedAllocation;
+                if (delta >= 0)
+                    break;
+                if (wasteTrackerUpdater.compareAndSet(this, oldTrackerValue, avoidReservedValues(time + delta)))
+                    return false;
+            }
+        }
+        // We have definitely reached our waste limit so set the state if it isn't already
+        wasteTrackerUpdater.set(this, TRACKER_PESSIMISTIC_LOCKING);
+        // And tell the caller to proceed with pessimistic locking
+        return true;
+    }
+
+    private static int avoidReservedValues(int wasteTracker)
+    {
+        if (wasteTracker == TRACKER_NEVER_WASTED || wasteTracker == TRACKER_PESSIMISTIC_LOCKING)
+            return wasteTracker + 1;
+        return wasteTracker;
+    }
+
+    private static final class Holder
+    {
+        final DeletionInfo deletionInfo;
+        // the btree of rows
+        final Object[] tree;
+        final MemtableRowData staticRow;
+        final RowStats stats;
+
+        Holder(Object[] tree, DeletionInfo deletionInfo, MemtableRowData staticRow, RowStats stats)
+        {
+            this.tree = tree;
+            this.deletionInfo = deletionInfo;
+            this.staticRow = staticRow;
+            this.stats = stats;
+        }
+
+        Holder with(DeletionInfo info)
+        {
+            return new Holder(this.tree, info, this.staticRow, this.stats);
+        }
+    }
+
+    // the function we provide to the btree utilities to perform any column replacements
+    private static final class RowUpdater implements UpdateFunction<Row, MemtableRowData>
+    {
+        final AtomicBTreePartition updating;
+        final MemtableAllocator allocator;
+        final OpOrder.Group writeOp;
+        final Updater indexer;
+        final int nowInSec;
+        Holder ref;
+        long dataSize;
+        long heapSize;
+        long colUpdateTimeDelta = Long.MAX_VALUE;
+        final MemtableRowData.ReusableRow row;
+        final MemtableAllocator.DataReclaimer reclaimer;
+        final MemtableAllocator.RowAllocator rowAllocator;
+        List<MemtableRowData> inserted; // TODO: replace with walk of aborted BTree
+
+        private RowUpdater(AtomicBTreePartition updating, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
+        {
+            this.updating = updating;
+            this.allocator = allocator;
+            this.writeOp = writeOp;
+            this.indexer = indexer;
+            this.nowInSec = FBUtilities.nowInSeconds();
+            this.row = allocator.newReusableRow();
+            this.reclaimer = allocator.reclaimer();
+            this.rowAllocator = allocator.newRowAllocator(updating.metadata(), writeOp);
+        }
+
+        public MemtableRowData apply(Row insert)
+        {
+            rowAllocator.allocateNewRow(insert.clustering().size(), insert.columns(), insert.isStatic());
+            insert.copyTo(rowAllocator);
+            MemtableRowData data = rowAllocator.allocatedRowData();
+
+            insertIntoIndexes(insert);
+
+            this.dataSize += data.dataSize();
+            this.heapSize += data.unsharedHeapSizeExcludingData();
+            if (inserted == null)
+                inserted = new ArrayList<>();
+            inserted.add(data);
+            return data;
+        }
+
+        public MemtableRowData apply(MemtableRowData existing, Row update)
+        {
+            Columns mergedColumns = existing.columns().mergeTo(update.columns());
+            rowAllocator.allocateNewRow(update.clustering().size(), mergedColumns, update.isStatic());
+
+            colUpdateTimeDelta = Math.min(colUpdateTimeDelta, Rows.merge(row.setTo(existing), update, mergedColumns, rowAllocator, nowInSec, indexer));
+
+            MemtableRowData reconciled = rowAllocator.allocatedRowData();
+
+            dataSize += reconciled.dataSize() - existing.dataSize();
+            heapSize += reconciled.unsharedHeapSizeExcludingData() - existing.unsharedHeapSizeExcludingData();
+            if (inserted == null)
+                inserted = new ArrayList<>();
+            inserted.add(reconciled);
+            discard(existing);
+
+            return reconciled;
+        }
+
+        private void insertIntoIndexes(Row toInsert)
+        {
+            if (indexer == SecondaryIndexManager.nullUpdater)
+                return;
+
+            maybeIndexPrimaryKeyColumns(toInsert);
+            Clustering clustering = toInsert.clustering();
+            for (Cell cell : toInsert)
+                indexer.insert(clustering, cell);
+        }
+
+        private void maybeIndexPrimaryKeyColumns(Row row)
+        {
+            // We want to update a primary key index with the most up to date info contains in that inserted row (if only for
+            // backward compatibility). Note that if there is an index but not a partition key or clustering column one, we've
+            // wasting this work. We might be able to avoid that if row indexing was pushed in the index updater.
+            long timestamp = row.primaryKeyLivenessInfo().timestamp();
+            int ttl = row.primaryKeyLivenessInfo().ttl();
+
+            for (Cell cell : row)
+            {
+                long cellTimestamp = cell.livenessInfo().timestamp();
+                if (cell.isLive(nowInSec))
+                {
+                    if (cellTimestamp > timestamp)
+                    {
+                        timestamp = cellTimestamp;
+                        ttl = cell.livenessInfo().ttl();
+                    }
+                }
+            }
+
+            indexer.maybeIndex(row.clustering(), timestamp, ttl, row.deletion());
+        }
+
+        protected void reset()
+        {
+            this.dataSize = 0;
+            this.heapSize = 0;
+            if (inserted != null)
+            {
+                for (MemtableRowData row : inserted)
+                    abort(row);
+                inserted.clear();
+            }
+            reclaimer.cancel();
+        }
+
+        protected void abort(MemtableRowData abort)
+        {
+            reclaimer.reclaimImmediately(abort);
+        }
+
+        protected void discard(MemtableRowData discard)
+        {
+            reclaimer.reclaim(discard);
+        }
+
+        public boolean abortEarly()
+        {
+            return updating.ref != ref;
+        }
+
+        public void allocated(long heapSize)
+        {
+            this.heapSize += heapSize;
+        }
+
+        protected void finish()
+        {
+            allocator.onHeap().allocate(heapSize, writeOp);
+            reclaimer.commit();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/CachedPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/CachedPartition.java b/src/java/org/apache/cassandra/db/partitions/CachedPartition.java
new file mode 100644
index 0000000..dc482f3
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/CachedPartition.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.cache.IRowCacheEntry;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.io.ISerializer;
+
+/**
+ * A partition stored in the partition cache.
+ *
+ * Note that in practice, the only implementation of this is {@link ArrayBackedPartition},
+ * we keep this interface mainly 1) to make it clear what we need from partition in the cache
+ * (that we don't otherwise) and 2) because {@code ArrayBackedPartition} is used for other
+ * purpose (than caching) and hence using {@code CachedPartition} when we talk about caching is
+ * clearer.
+ */
+public interface CachedPartition extends Partition, IRowCacheEntry
+{
+    public static final ISerializer<CachedPartition> cacheSerializer = new ArrayBackedCachedPartition.Serializer();
+
+    /**
+     * The number of {@code Row} objects in this cached partition.
+     *
+     * Please note that this is <b>not</b> the number of <em>live</em> rows since
+     * some of the row may only contains deleted (or expired) information.
+     *
+     * @return the number of row in the partition.
+     */
+    public int rowCount();
+
+    /**
+     * The number of rows that were live at the time the partition was cached.
+     *
+     * See {@link ColumnFamilyStore#isFilterFullyCoveredBy} to see why we need this.
+     *
+     * @return the number of rows in this partition that were live at the time the
+     * partition was cached (this can be different from the number of live rows now
+     * due to expiring cells).
+     */
+    public int cachedLiveRows();
+
+    /**
+     * The number of rows in this cached partition that have at least one non-expiring
+     * non-deleted cell.
+     *
+     * Note that this is generally not a very meaningful number, but this is used by
+     * {@link DataLimits#hasEnoughLiveData} as an optimization.
+     *
+     * @return the number of row that have at least one non-expiring non-deleted cell.
+     */
+    public int rowsWithNonExpiringCells();
+
+    /**
+     * The last row in this cached partition (in order words, the row with the
+     * biggest clustering that the partition contains).
+     *
+     * @return the last row of the partition, or {@code null} if the partition is empty.
+     */
+    public Row lastRow();
+
+    /**
+     * The number of {@code cell} objects that are not tombstone in this cached partition.
+     *
+     * Please note that this is <b>not</b> the number of <em>live</em> cells since
+     * some of the cells might be expired.
+     *
+     * @return the number of non tombstone cells in the partition.
+     */
+    public int nonTombstoneCellCount();
+
+    /**
+     * The number of cells in this cached partition that are neither tombstone nor expiring.
+     *
+     * Note that this is generally not a very meaningful number, but this is used by
+     * {@link DataLimits#hasEnoughLiveData} as an optimization.
+     *
+     * @return the number of cells that are neither tombstones nor expiring.
+     */
+    public int nonExpiringLiveCells();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/CountingPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/CountingPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/CountingPartitionIterator.java
new file mode 100644
index 0000000..16445e7
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/CountingPartitionIterator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.DataLimits;
+
+public class CountingPartitionIterator extends WrappingPartitionIterator
+{
+    protected final DataLimits.Counter counter;
+
+    public CountingPartitionIterator(PartitionIterator result, DataLimits.Counter counter)
+    {
+        super(result);
+        this.counter = counter;
+    }
+
+    public CountingPartitionIterator(PartitionIterator result, DataLimits limits, int nowInSec)
+    {
+        this(result, limits.newCounter(nowInSec, true));
+    }
+
+    public DataLimits.Counter counter()
+    {
+        return counter;
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+        if (counter.isDone())
+            return false;
+
+        return super.hasNext();
+    }
+
+    @Override
+    @SuppressWarnings("resource") // Close through the closing of the returned 'CountingRowIterator' (and CountingRowIterator shouldn't throw)
+    public RowIterator next()
+    {
+        return new CountingRowIterator(super.next(), counter);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/CountingRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/CountingRowIterator.java b/src/java/org/apache/cassandra/db/partitions/CountingRowIterator.java
new file mode 100644
index 0000000..4ad321e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/CountingRowIterator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.DataLimits;
+
+public class CountingRowIterator extends WrappingRowIterator
+{
+    protected final DataLimits.Counter counter;
+
+    public CountingRowIterator(RowIterator iter, DataLimits.Counter counter)
+    {
+        super(iter);
+        this.counter = counter;
+
+        counter.newPartition(iter.partitionKey(), iter.staticRow());
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+        if (counter.isDoneForPartition())
+            return false;
+
+        return super.hasNext();
+    }
+
+    @Override
+    public Row next()
+    {
+        Row row = super.next();
+        counter.newRow(row);
+        return row;
+    }
+
+    @Override
+    public void close()
+    {
+        super.close();
+        counter.endOfPartition();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredPartitionIterator.java
new file mode 100644
index 0000000..52eedd4
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredPartitionIterator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.DataLimits;
+
+public class CountingUnfilteredPartitionIterator extends WrappingUnfilteredPartitionIterator
+{
+    protected final DataLimits.Counter counter;
+
+    public CountingUnfilteredPartitionIterator(UnfilteredPartitionIterator result, DataLimits.Counter counter)
+    {
+        super(result);
+        this.counter = counter;
+    }
+
+    public DataLimits.Counter counter()
+    {
+        return counter;
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+        if (counter.isDone())
+            return false;
+
+        return super.hasNext();
+    }
+
+    @Override
+    public UnfilteredRowIterator computeNext(UnfilteredRowIterator iter)
+    {
+        return new CountingUnfilteredRowIterator(iter, counter);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java
new file mode 100644
index 0000000..acaef5d
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.DataLimits;
+
+public class CountingUnfilteredRowIterator extends WrappingUnfilteredRowIterator
+{
+    private final DataLimits.Counter counter;
+
+    public CountingUnfilteredRowIterator(UnfilteredRowIterator iter, DataLimits.Counter counter)
+    {
+        super(iter);
+        this.counter = counter;
+
+        counter.newPartition(iter.partitionKey(), iter.staticRow());
+    }
+
+    public DataLimits.Counter counter()
+    {
+        return counter;
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+        if (counter.isDoneForPartition())
+            return false;
+
+        return super.hasNext();
+    }
+
+    @Override
+    public Unfiltered next()
+    {
+        Unfiltered unfiltered = super.next();
+        if (unfiltered.kind() == Unfiltered.Kind.ROW)
+            counter.newRow((Row) unfiltered);
+        return unfiltered;
+    }
+
+    @Override
+    public void close()
+    {
+        super.close();
+        counter.endOfPartition();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
new file mode 100644
index 0000000..813654d
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.util.Iterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+
+public class FilteredPartition extends AbstractPartitionData implements Iterable<Row>
+{
+    private FilteredPartition(CFMetaData metadata,
+                              DecoratedKey partitionKey,
+                              PartitionColumns columns,
+                              int initialRowCapacity,
+                              boolean sortable)
+    {
+        super(metadata, partitionKey, DeletionTime.LIVE, columns, initialRowCapacity, sortable);
+    }
+
+    /**
+     * Create a FilteredPartition holding all the rows of the provided iterator.
+     *
+     * Warning: Note that this method does not close the provided iterator and it is
+     * up to the caller to do so.
+     */
+    public static FilteredPartition create(RowIterator iterator)
+    {
+        FilteredPartition partition = new FilteredPartition(iterator.metadata(),
+                                                            iterator.partitionKey(),
+                                                            iterator.columns(),
+                                                            4,
+                                                            iterator.isReverseOrder());
+
+        partition.staticRow = iterator.staticRow().takeAlias();
+
+        Writer writer = partition.new Writer(true);
+
+        while (iterator.hasNext())
+            iterator.next().copyTo(writer);
+
+        // A Partition (or more precisely AbstractPartitionData) always assumes that its data is in clustering
+        // order. So if we've just added them in reverse clustering order, reverse them.
+        if (iterator.isReverseOrder())
+            partition.reverse();
+
+        return partition;
+    }
+
+    public RowIterator rowIterator()
+    {
+        final Iterator<Row> iter = iterator();
+        return new RowIterator()
+        {
+            public CFMetaData metadata()
+            {
+                return metadata;
+            }
+
+            public boolean isReverseOrder()
+            {
+                return false;
+            }
+
+            public PartitionColumns columns()
+            {
+                return columns;
+            }
+
+            public DecoratedKey partitionKey()
+            {
+                return key;
+            }
+
+            public Row staticRow()
+            {
+                return staticRow == null ? Rows.EMPTY_STATIC_ROW : staticRow;
+            }
+
+            public boolean hasNext()
+            {
+                return iter.hasNext();
+            }
+
+            public Row next()
+            {
+                return iter.next();
+            }
+
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+
+            public void close()
+            {
+            }
+        };
+    }
+
+    @Override
+    public String toString()
+    {
+        try (RowIterator iterator = rowIterator())
+        {
+            StringBuilder sb = new StringBuilder();
+            CFMetaData metadata = iterator.metadata();
+            PartitionColumns columns = iterator.columns();
+
+            sb.append(String.format("[%s.%s] key=%s columns=%s reversed=%b",
+                                    metadata.ksName,
+                                    metadata.cfName,
+                                    metadata.getKeyValidator().getString(iterator.partitionKey().getKey()),
+                                    columns,
+                                    iterator.isReverseOrder()));
+
+            if (iterator.staticRow() != Rows.EMPTY_STATIC_ROW)
+                sb.append("\n    ").append(iterator.staticRow().toString(metadata));
+
+            while (iterator.hasNext())
+                sb.append("\n    ").append(iterator.next().toString(metadata));
+
+            return sb.toString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/FilteringPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/FilteringPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/FilteringPartitionIterator.java
new file mode 100644
index 0000000..c40109b
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/FilteringPartitionIterator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+
+/**
+ * Abstract class to make it easier to write iterators that filter some
+ * parts of another iterator (used for purging tombstones and removing dropped columns).
+ */
+public abstract class FilteringPartitionIterator extends WrappingUnfilteredPartitionIterator
+{
+    private UnfilteredRowIterator next;
+
+    protected FilteringPartitionIterator(UnfilteredPartitionIterator iter)
+    {
+        super(iter);
+    }
+
+    // The filter to use for filtering row contents. Is null by default to mean no particular filtering
+    // but can be overriden by subclasses. Please see FilteringAtomIterator for details on how this is used.
+    protected FilteringRow makeRowFilter()
+    {
+        return null;
+    }
+
+    // Whether or not we should bother filtering the provided rows iterator. This
+    // exists mainly for preformance
+    protected boolean shouldFilter(UnfilteredRowIterator iterator)
+    {
+        return true;
+    }
+
+    protected boolean includeRangeTombstoneMarker(RangeTombstoneMarker marker)
+    {
+        return true;
+    }
+
+    protected boolean includePartitionDeletion(DeletionTime dt)
+    {
+        return true;
+    }
+
+    // Allows to modify the range tombstone returned. This is called *after* includeRangeTombstoneMarker has been called.
+    protected RangeTombstoneMarker filterRangeTombstoneMarker(RangeTombstoneMarker marker, boolean reversed)
+    {
+        return marker;
+    }
+
+    // Called when a particular partition is skipped due to being empty post filtering
+    protected void onEmpty(DecoratedKey key)
+    {
+    }
+
+    public boolean hasNext()
+    {
+        while (next == null && super.hasNext())
+        {
+            UnfilteredRowIterator iterator = super.next();
+            if (shouldFilter(iterator))
+            {
+                next = new FilteringIterator(iterator);
+                if (!isForThrift() && next.isEmpty())
+                {
+                    onEmpty(iterator.partitionKey());
+                    iterator.close();
+                    next = null;
+                }
+            }
+            else
+            {
+                next = iterator;
+            }
+        }
+        return next != null;
+    }
+
+    public UnfilteredRowIterator next()
+    {
+        UnfilteredRowIterator toReturn = next;
+        next = null;
+        return toReturn;
+    }
+
+    @Override
+    public void close()
+    {
+        try
+        {
+            super.close();
+        }
+        finally
+        {
+            if (next != null)
+                next.close();
+        }
+    }
+
+    private class FilteringIterator extends FilteringRowIterator
+    {
+        private FilteringIterator(UnfilteredRowIterator iterator)
+        {
+            super(iterator);
+        }
+
+        @Override
+        protected FilteringRow makeRowFilter()
+        {
+            return FilteringPartitionIterator.this.makeRowFilter();
+        }
+
+        @Override
+        protected boolean includeRangeTombstoneMarker(RangeTombstoneMarker marker)
+        {
+            return FilteringPartitionIterator.this.includeRangeTombstoneMarker(marker);
+        }
+
+        @Override
+        protected RangeTombstoneMarker filterRangeTombstoneMarker(RangeTombstoneMarker marker, boolean reversed)
+        {
+            return FilteringPartitionIterator.this.filterRangeTombstoneMarker(marker, reversed);
+        }
+
+        @Override
+        protected boolean includePartitionDeletion(DeletionTime dt)
+        {
+            return FilteringPartitionIterator.this.includePartitionDeletion(dt);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/Partition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/Partition.java b/src/java/org/apache/cassandra/db/partitions/Partition.java
new file mode 100644
index 0000000..71d0411
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/Partition.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.Slices;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.utils.SearchIterator;
+
+/**
+ * In-memory representation of a Partition.
+ *
+ * Note that most of the storage engine works through iterators (UnfilteredPartitionIterator) to
+ * avoid "materializing" a full partition/query response in memory as much as possible,
+ * and so Partition objects should be use as sparingly as possible. There is a couple
+ * of cases where we do need to represent partition in-memory (memtables and row cache).
+ */
+public interface Partition
+{
+    public CFMetaData metadata();
+    public DecoratedKey partitionKey();
+    public DeletionTime partitionLevelDeletion();
+
+    public PartitionColumns columns();
+
+    public RowStats stats();
+
+    /**
+     * Whether the partition object has no informations at all, including any deletion informations.
+     */
+    public boolean isEmpty();
+
+    /**
+     * Returns the row corresponding to the provided clustering, or null if there is not such row.
+     */
+    public Row getRow(Clustering clustering);
+
+    /**
+     * Returns an iterator that allows to search specific rows efficiently.
+     */
+    public SearchIterator<Clustering, Row> searchIterator(ColumnFilter columns, boolean reversed);
+
+    /**
+     * Returns an UnfilteredRowIterator over all the rows/RT contained by this partition.
+     */
+    public UnfilteredRowIterator unfilteredIterator();
+
+    /**
+     * Returns an UnfilteredRowIterator over the rows/RT contained by this partition
+     * selected by the provided slices.
+     */
+    public UnfilteredRowIterator unfilteredIterator(ColumnFilter columns, Slices slices, boolean reversed);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/PartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/PartitionIterator.java
new file mode 100644
index 0000000..36358fc
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.util.Iterator;
+
+import org.apache.cassandra.db.rows.*;
+
+/**
+ * An iterator over a number of (filtered) partition.
+ *
+ * PartitionIterator is to RowIterator what UnfilteredPartitionIterator is to UnfilteredRowIterator
+ * though unlike UnfilteredPartitionIterator, it is not guaranteed that the RowIterator
+ * returned are in partitioner order.
+ *
+ * The object returned by a call to next() is only guaranteed to be
+ * valid until the next call to hasNext() or next(). If a consumer wants to keep a
+ * reference on the returned objects for longer than the iteration, it must
+ * make a copy of it explicitely.
+ */
+public interface PartitionIterator extends Iterator<RowIterator>, AutoCloseable
+{
+    public void close();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
new file mode 100644
index 0000000..219aa5a
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.util.*;
+import java.security.MessageDigest;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.util.FileUtils;
+
+public abstract class PartitionIterators
+{
+    private PartitionIterators() {}
+
+    public static final PartitionIterator EMPTY = new PartitionIterator()
+    {
+        public boolean hasNext()
+        {
+            return false;
+        }
+
+        public RowIterator next()
+        {
+            throw new NoSuchElementException();
+        }
+
+        public void remove()
+        {
+        }
+
+        public void close()
+        {
+        }
+    };
+
+    @SuppressWarnings("resource") // The created resources are returned right away
+    public static RowIterator getOnlyElement(final PartitionIterator iter, SinglePartitionReadCommand<?> command)
+    {
+        // If the query has no results, we'll get an empty iterator, but we still
+        // want a RowIterator out of this method, so we return an empty one.
+        RowIterator toReturn = iter.hasNext()
+                             ? iter.next()
+                             : RowIterators.emptyIterator(command.metadata(),
+                                                          command.partitionKey(),
+                                                          command.clusteringIndexFilter().isReversed());
+
+        // Note that in general, we should wrap the result so that it's close method actually
+        // close the whole PartitionIterator.
+        return new WrappingRowIterator(toReturn)
+        {
+            public void close()
+            {
+                try
+                {
+                    super.close();
+                }
+                finally
+                {
+                    // asserting this only now because it bothers UnfilteredPartitionIterators.Serializer (which might be used
+                    // under the provided DataIter) if hasNext() is called before the previously returned iterator hasn't been fully consumed.
+                    assert !iter.hasNext();
+
+                    iter.close();
+                }
+            }
+        };
+    }
+
+    @SuppressWarnings("resource") // The created resources are returned right away
+    public static PartitionIterator concat(final List<PartitionIterator> iterators)
+    {
+        if (iterators.size() == 1)
+            return iterators.get(0);
+
+        return new PartitionIterator()
+        {
+            private int idx = 0;
+
+            public boolean hasNext()
+            {
+                while (idx < iterators.size())
+                {
+                    if (iterators.get(idx).hasNext())
+                        return true;
+
+                    ++idx;
+                }
+                return false;
+            }
+
+            public RowIterator next()
+            {
+                if (!hasNext())
+                    throw new NoSuchElementException();
+                return iterators.get(idx).next();
+            }
+
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+
+            public void close()
+            {
+                FileUtils.closeQuietly(iterators);
+            }
+        };
+    }
+
+    public static void digest(PartitionIterator iterator, MessageDigest digest)
+    {
+        while (iterator.hasNext())
+        {
+            try (RowIterator partition = iterator.next())
+            {
+                RowIterators.digest(partition, digest);
+            }
+        }
+    }
+
+    public static PartitionIterator singletonIterator(RowIterator iterator)
+    {
+        return new SingletonPartitionIterator(iterator);
+    }
+
+    public static void consume(PartitionIterator iterator)
+    {
+        while (iterator.hasNext())
+        {
+            try (RowIterator partition = iterator.next())
+            {
+                while (partition.hasNext())
+                    partition.next();
+            }
+        }
+    }
+
+    /**
+     * Wraps the provided iterator so it logs the returned rows for debugging purposes.
+     * <p>
+     * Note that this is only meant for debugging as this can log a very large amount of
+     * logging at INFO.
+     */
+    @SuppressWarnings("resource") // The created resources are returned right away
+    public static PartitionIterator loggingIterator(PartitionIterator iterator, final String id)
+    {
+        return new WrappingPartitionIterator(iterator)
+        {
+            public RowIterator next()
+            {
+                return RowIterators.loggingIterator(super.next(), id);
+            }
+        };
+    }
+
+    private static class SingletonPartitionIterator extends AbstractIterator<RowIterator> implements PartitionIterator
+    {
+        private final RowIterator iterator;
+        private boolean returned;
+
+        private SingletonPartitionIterator(RowIterator iterator)
+        {
+            this.iterator = iterator;
+        }
+
+        protected RowIterator computeNext()
+        {
+            if (returned)
+                return endOfData();
+
+            returned = true;
+            return iterator;
+        }
+
+        public void close()
+        {
+            iterator.close();
+        }
+    }
+}