You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:42 UTC

[18/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
new file mode 100644
index 0000000..2c71cf3
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -0,0 +1,770 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.security.MessageDigest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.IMergeIterator;
+import org.apache.cassandra.utils.MergeIterator;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+
+/**
+ * Static methods to work with atom iterators.
+ */
+public abstract class UnfilteredRowIterators
+{
+    private static final Logger logger = LoggerFactory.getLogger(UnfilteredRowIterators.class);
+
+    private UnfilteredRowIterators() {}
+
+    public interface MergeListener
+    {
+        public void onMergePartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions);
+
+        public void onMergingRows(Clustering clustering, LivenessInfo mergedInfo, DeletionTime mergedDeletion, Row[] versions);
+        public void onMergedComplexDeletion(ColumnDefinition c, DeletionTime mergedComplexDeletion, DeletionTime[] versions);
+        public void onMergedCells(Cell mergedCell, Cell[] versions);
+        public void onRowDone();
+
+        public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions);
+
+        public void close();
+    }
+
+    /**
+     * Returns a iterator that only returns rows with only live content.
+     *
+     * This is mainly used in the CQL layer when we know we don't care about deletion
+     * infos (and since an UnfilteredRowIterator cannot shadow it's own data, we know everyting
+     * returned isn't shadowed by a tombstone).
+     */
+    public static RowIterator filter(UnfilteredRowIterator iter, int nowInSec)
+    {
+        return new FilteringIterator(iter, nowInSec);
+
+    }
+
+    /**
+     * Returns an iterator that is the result of merging other iterators.
+     */
+    public static UnfilteredRowIterator merge(List<UnfilteredRowIterator> iterators, int nowInSec)
+    {
+        assert !iterators.isEmpty();
+        if (iterators.size() == 1)
+            return iterators.get(0);
+
+        return UnfilteredRowMergeIterator.create(iterators, nowInSec, null);
+    }
+
+    /**
+     * Returns an iterator that is the result of merging other iterators, and using
+     * specific MergeListener.
+     *
+     * Note that this method assumes that there is at least 2 iterators to merge.
+     */
+    public static UnfilteredRowIterator merge(List<UnfilteredRowIterator> iterators, int nowInSec, MergeListener mergeListener)
+    {
+        assert mergeListener != null;
+        return UnfilteredRowMergeIterator.create(iterators, nowInSec, mergeListener);
+    }
+
+    /**
+     * Returns an empty atom iterator for a given partition.
+     */
+    public static UnfilteredRowIterator emptyIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final boolean isReverseOrder)
+    {
+        return new UnfilteredRowIterator()
+        {
+            public CFMetaData metadata()
+            {
+                return cfm;
+            }
+
+            public boolean isReverseOrder()
+            {
+                return isReverseOrder;
+            }
+
+            public PartitionColumns columns()
+            {
+                return PartitionColumns.NONE;
+            }
+
+            public DecoratedKey partitionKey()
+            {
+                return partitionKey;
+            }
+
+            public DeletionTime partitionLevelDeletion()
+            {
+                return DeletionTime.LIVE;
+            }
+
+            public Row staticRow()
+            {
+                return Rows.EMPTY_STATIC_ROW;
+            }
+
+            public RowStats stats()
+            {
+                return RowStats.NO_STATS;
+            }
+
+            public boolean hasNext()
+            {
+                return false;
+            }
+
+            public Unfiltered next()
+            {
+                throw new NoSuchElementException();
+            }
+
+            public void remove()
+            {
+            }
+
+            public void close()
+            {
+            }
+        };
+    }
+
+    public static void digest(UnfilteredRowIterator iterator, MessageDigest digest)
+    {
+        // TODO: we're not computing digest the same way that old nodes. This
+        // means we'll have digest mismatches during upgrade. We should pass the messaging version of
+        // the node this is for (which might mean computing the digest last, and won't work
+        // for schema (where we announce the version through gossip to everyone))
+        digest.update(iterator.partitionKey().getKey().duplicate());
+        iterator.partitionLevelDeletion().digest(digest);
+        iterator.columns().digest(digest);
+        FBUtilities.updateWithBoolean(digest, iterator.isReverseOrder());
+        iterator.staticRow().digest(digest);
+
+        while (iterator.hasNext())
+        {
+            Unfiltered unfiltered = iterator.next();
+            if (unfiltered.kind() == Unfiltered.Kind.ROW)
+                ((Row) unfiltered).digest(digest);
+            else
+                ((RangeTombstoneMarker) unfiltered).digest(digest);
+        }
+    }
+
+    /**
+     * Returns an iterator that concatenate two atom iterators.
+     * This method assumes that both iterator are from the same partition and that the atom from
+     * {@code iter2} come after the ones of {@code iter1} (that is, that concatenating the iterator
+     * make sense).
+     */
+    public static UnfilteredRowIterator concat(final UnfilteredRowIterator iter1, final UnfilteredRowIterator iter2)
+    {
+        assert iter1.metadata().cfId.equals(iter2.metadata().cfId)
+            && iter1.partitionKey().equals(iter2.partitionKey())
+            && iter1.partitionLevelDeletion().equals(iter2.partitionLevelDeletion())
+            && iter1.isReverseOrder() == iter2.isReverseOrder()
+            && iter1.columns().equals(iter2.columns())
+            && iter1.staticRow().equals(iter2.staticRow());
+
+        return new AbstractUnfilteredRowIterator(iter1.metadata(),
+                                        iter1.partitionKey(),
+                                        iter1.partitionLevelDeletion(),
+                                        iter1.columns(),
+                                        iter1.staticRow(),
+                                        iter1.isReverseOrder(),
+                                        iter1.stats())
+        {
+            protected Unfiltered computeNext()
+            {
+                if (iter1.hasNext())
+                    return iter1.next();
+
+                return iter2.hasNext() ? iter2.next() : endOfData();
+            }
+
+            @Override
+            public void close()
+            {
+                try
+                {
+                    iter1.close();
+                }
+                finally
+                {
+                    iter2.close();
+                }
+            }
+        };
+    }
+
+    public static UnfilteredRowIterator cloningIterator(UnfilteredRowIterator iterator, final AbstractAllocator allocator)
+    {
+        return new WrappingUnfilteredRowIterator(iterator)
+        {
+            private final CloningRow cloningRow = new CloningRow();
+            private final RangeTombstoneMarker.Builder markerBuilder = new RangeTombstoneMarker.Builder(iterator.metadata().comparator.size());
+
+            public Unfiltered next()
+            {
+                Unfiltered next = super.next();
+                return next.kind() == Unfiltered.Kind.ROW
+                     ? cloningRow.setTo((Row)next)
+                     : clone((RangeTombstoneMarker)next);
+            }
+
+            private RangeTombstoneMarker clone(RangeTombstoneMarker marker)
+            {
+                markerBuilder.reset();
+
+                RangeTombstone.Bound bound = marker.clustering();
+                for (int i = 0; i < bound.size(); i++)
+                    markerBuilder.writeClusteringValue(allocator.clone(bound.get(i)));
+                markerBuilder.writeBoundKind(bound.kind());
+                if (marker.isBoundary())
+                {
+                    RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker;
+                    markerBuilder.writeBoundaryDeletion(bm.endDeletionTime(), bm.startDeletionTime());
+                }
+                else
+                {
+                    markerBuilder.writeBoundDeletion(((RangeTombstoneBoundMarker)marker).deletionTime());
+                }
+                markerBuilder.endOfMarker();
+                return markerBuilder.build();
+            }
+
+            class CloningRow extends WrappingRow
+            {
+                private final CloningClustering cloningClustering = new CloningClustering();
+                private final CloningCell cloningCell = new CloningCell();
+
+                protected Cell filterCell(Cell cell)
+                {
+                    return cloningCell.setTo(cell);
+                }
+
+                @Override
+                public Clustering clustering()
+                {
+                    return cloningClustering.setTo(super.clustering());
+                }
+            }
+
+            class CloningClustering extends Clustering
+            {
+                private Clustering wrapped;
+
+                public Clustering setTo(Clustering wrapped)
+                {
+                    this.wrapped = wrapped;
+                    return this;
+                }
+
+                public int size()
+                {
+                    return wrapped.size();
+                }
+
+                public ByteBuffer get(int i)
+                {
+                    ByteBuffer value = wrapped.get(i);
+                    return value == null ? null : allocator.clone(value);
+                }
+
+                public ByteBuffer[] getRawValues()
+                {
+                    throw new UnsupportedOperationException();
+                }
+            }
+
+            class CloningCell extends AbstractCell
+            {
+                private Cell wrapped;
+
+                public Cell setTo(Cell wrapped)
+                {
+                    this.wrapped = wrapped;
+                    return this;
+                }
+
+                public ColumnDefinition column()
+                {
+                    return wrapped.column();
+                }
+
+                public boolean isCounterCell()
+                {
+                    return wrapped.isCounterCell();
+                }
+
+                public ByteBuffer value()
+                {
+                    return allocator.clone(wrapped.value());
+                }
+
+                public LivenessInfo livenessInfo()
+                {
+                    return wrapped.livenessInfo();
+                }
+
+                public CellPath path()
+                {
+                    CellPath path = wrapped.path();
+                    if (path == null)
+                        return null;
+
+                    assert path.size() == 1;
+                    return CellPath.create(allocator.clone(path.get(0)));
+                }
+            }
+        };
+    }
+
+    /**
+     * Turns the given iterator into an update.
+     *
+     * Warning: this method does not close the provided iterator, it is up to
+     * the caller to close it.
+     */
+    public static PartitionUpdate toUpdate(UnfilteredRowIterator iterator)
+    {
+        PartitionUpdate update = new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), iterator.columns(), 1);
+
+        update.addPartitionDeletion(iterator.partitionLevelDeletion());
+
+        if (iterator.staticRow() != Rows.EMPTY_STATIC_ROW)
+            iterator.staticRow().copyTo(update.staticWriter());
+
+        while (iterator.hasNext())
+        {
+            Unfiltered unfiltered = iterator.next();
+            if (unfiltered.kind() == Unfiltered.Kind.ROW)
+                ((Row) unfiltered).copyTo(update.writer());
+            else
+                ((RangeTombstoneMarker) unfiltered).copyTo(update.markerWriter(iterator.isReverseOrder()));
+        }
+
+        return update;
+    }
+
+    /**
+     * Validate that the data of the provided iterator is valid, that is that the values
+     * it contains are valid for the type they represent, and more generally that the
+     * infos stored are sensible.
+     *
+     * This is mainly used by scrubber to detect problems in sstables.
+     *
+     * @param iterator the partition to check.
+     * @param filename the name of the file the data is comming from.
+     * @return an iterator that returns the same data than {@code iterator} but that
+     * checks said data and throws a {@code CorruptedSSTableException} if it detects
+     * invalid data.
+     */
+    public static UnfilteredRowIterator withValidation(UnfilteredRowIterator iterator, final String filename)
+    {
+        return new WrappingUnfilteredRowIterator(iterator)
+        {
+            public Unfiltered next()
+            {
+                Unfiltered next = super.next();
+                try
+                {
+                    next.validateData(metadata());
+                    return next;
+                }
+                catch (MarshalException me)
+                {
+                    throw new CorruptSSTableException(me, filename);
+                }
+            }
+        };
+    }
+
+    /**
+     * Convert all expired cells to equivalent tombstones.
+     * <p>
+     * Once a cell expires, it acts exactly as a tombstone and this until it is purged. But in particular that
+     * means we don't care about the value of an expired cell, and it is thus equivalent but more efficient to
+     * replace the expired cell by an equivalent tombstone (that has no value).
+     *
+     * @param iterator the iterator in which to conver expired cells.
+     * @param nowInSec the current time to use to decide if a cell is expired.
+     * @return an iterator that returns the same data than {@code iterator} but with all expired cells converted
+     * to equivalent tombstones.
+     */
+    public static UnfilteredRowIterator convertExpiredCellsToTombstones(UnfilteredRowIterator iterator, final int nowInSec)
+    {
+        return new FilteringRowIterator(iterator)
+        {
+            protected FilteringRow makeRowFilter()
+            {
+                return new FilteringRow()
+                {
+                    @Override
+                    protected Cell filterCell(Cell cell)
+                    {
+                        Cell filtered = super.filterCell(cell);
+                        if (filtered == null)
+                            return null;
+
+                        LivenessInfo info = filtered.livenessInfo();
+                        if (info.hasTTL() && !filtered.isLive(nowInSec))
+                        {
+                            // The column is now expired, we can safely return a simple tombstone. Note that as long as the expiring
+                            // column and the tombstone put together live longer than GC grace seconds, we'll fulfil our responsibility
+                            // to repair. See discussion at
+                            // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html
+                            return Cells.create(filtered.column(),
+                                                filtered.isCounterCell(),
+                                                ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                SimpleLivenessInfo.forDeletion(info.timestamp(), info.localDeletionTime() - info.ttl()),
+                                                filtered.path());
+                        }
+                        else
+                        {
+                            return filtered;
+                        }
+                    }
+                };
+            }
+        };
+    }
+
+    /**
+     * Wraps the provided iterator so it logs the returned atoms for debugging purposes.
+     * <p>
+     * Note that this is only meant for debugging as this can log a very large amount of
+     * logging at INFO.
+     */
+    public static UnfilteredRowIterator loggingIterator(UnfilteredRowIterator iterator, final String id, final boolean fullDetails)
+    {
+        CFMetaData metadata = iterator.metadata();
+        logger.info("[{}] Logging iterator on {}.{}, partition key={}, reversed={}, deletion={}",
+                    id,
+                    metadata.ksName,
+                    metadata.cfName,
+                    metadata.getKeyValidator().getString(iterator.partitionKey().getKey()),
+                    iterator.isReverseOrder(),
+                    iterator.partitionLevelDeletion().markedForDeleteAt());
+
+        return new WrappingUnfilteredRowIterator(iterator)
+        {
+            @Override
+            public Row staticRow()
+            {
+                Row row = super.staticRow();
+                if (!row.isEmpty())
+                    logger.info("[{}] {}", id, row.toString(metadata(), fullDetails));
+                return row;
+            }
+
+            @Override
+            public Unfiltered next()
+            {
+                Unfiltered next = super.next();
+                if (next.kind() == Unfiltered.Kind.ROW)
+                    logger.info("[{}] {}", id, ((Row)next).toString(metadata(), fullDetails));
+                else
+                    logger.info("[{}] {}", id, ((RangeTombstoneMarker)next).toString(metadata()));
+                return next;
+            }
+        };
+    }
+
+    /**
+     * A wrapper over MergeIterator to implement the UnfilteredRowIterator interface.
+     */
+    private static class UnfilteredRowMergeIterator extends AbstractUnfilteredRowIterator
+    {
+        private final IMergeIterator<Unfiltered, Unfiltered> mergeIterator;
+        private final MergeListener listener;
+
+        private UnfilteredRowMergeIterator(CFMetaData metadata,
+                                           List<UnfilteredRowIterator> iterators,
+                                           PartitionColumns columns,
+                                           DeletionTime partitionDeletion,
+                                           int nowInSec,
+                                           boolean reversed,
+                                           MergeListener listener)
+        {
+            super(metadata,
+                  iterators.get(0).partitionKey(),
+                  partitionDeletion,
+                  columns,
+                  mergeStaticRows(metadata, iterators, columns.statics, nowInSec, listener, partitionDeletion),
+                  reversed,
+                  mergeStats(iterators));
+
+            this.listener = listener;
+            this.mergeIterator = MergeIterator.get(iterators,
+                                                   reversed ? metadata.comparator.reversed() : metadata.comparator,
+                                                   new MergeReducer(metadata, iterators.size(), reversed, nowInSec));
+        }
+
+        private static UnfilteredRowMergeIterator create(List<UnfilteredRowIterator> iterators, int nowInSec, MergeListener listener)
+        {
+            try
+            {
+                checkForInvalidInput(iterators);
+                return new UnfilteredRowMergeIterator(iterators.get(0).metadata(),
+                                                      iterators,
+                                                      collectColumns(iterators),
+                                                      collectPartitionLevelDeletion(iterators, listener),
+                                                      nowInSec,
+                                                      iterators.get(0).isReverseOrder(),
+                                                      listener);
+            }
+            catch (RuntimeException | Error e)
+            {
+                try
+                {
+                    FBUtilities.closeAll(iterators);
+                }
+                catch (Exception suppressed)
+                {
+                    e.addSuppressed(suppressed);
+                }
+                throw e;
+            }
+        }
+
+        @SuppressWarnings("resource") // We're not really creating any resource here
+        private static void checkForInvalidInput(List<UnfilteredRowIterator> iterators)
+        {
+            if (iterators.isEmpty())
+                return;
+
+            UnfilteredRowIterator first = iterators.get(0);
+            for (int i = 1; i < iterators.size(); i++)
+            {
+                UnfilteredRowIterator iter = iterators.get(i);
+                assert first.metadata().cfId.equals(iter.metadata().cfId);
+                assert first.partitionKey().equals(iter.partitionKey());
+                assert first.isReverseOrder() == iter.isReverseOrder();
+            }
+        }
+
+        @SuppressWarnings("resource") // We're not really creating any resource here
+        private static DeletionTime collectPartitionLevelDeletion(List<UnfilteredRowIterator> iterators, MergeListener listener)
+        {
+            DeletionTime[] versions = listener == null ? null : new DeletionTime[iterators.size()];
+
+            DeletionTime delTime = DeletionTime.LIVE;
+            for (int i = 0; i < iterators.size(); i++)
+            {
+                UnfilteredRowIterator iter = iterators.get(i);
+                DeletionTime iterDeletion = iter.partitionLevelDeletion();
+                if (listener != null)
+                    versions[i] = iterDeletion;
+                if (!delTime.supersedes(iterDeletion))
+                    delTime = iterDeletion;
+            }
+            if (listener != null && !delTime.isLive())
+                listener.onMergePartitionLevelDeletion(delTime, versions);
+            return delTime;
+        }
+
+        private static Row mergeStaticRows(CFMetaData metadata,
+                                           List<UnfilteredRowIterator> iterators,
+                                           Columns columns,
+                                           int nowInSec,
+                                           MergeListener listener,
+                                           DeletionTime partitionDeletion)
+        {
+            if (columns.isEmpty())
+                return Rows.EMPTY_STATIC_ROW;
+
+            Row.Merger merger = Row.Merger.createStatic(metadata, iterators.size(), nowInSec, columns, listener);
+            for (int i = 0; i < iterators.size(); i++)
+                merger.add(i, iterators.get(i).staticRow());
+
+            // Note that we should call 'takeAlias' on the result in theory, but we know that we
+            // won't reuse the merger and so that it's ok not to.
+            Row merged = merger.merge(partitionDeletion);
+            return merged == null ? Rows.EMPTY_STATIC_ROW : merged;
+        }
+
+        private static PartitionColumns collectColumns(List<UnfilteredRowIterator> iterators)
+        {
+            PartitionColumns first = iterators.get(0).columns();
+            Columns statics = first.statics;
+            Columns regulars = first.regulars;
+            for (int i = 1; i < iterators.size(); i++)
+            {
+                PartitionColumns cols = iterators.get(i).columns();
+                statics = statics.mergeTo(cols.statics);
+                regulars = regulars.mergeTo(cols.regulars);
+            }
+            return statics == first.statics && regulars == first.regulars
+                 ? first
+                 : new PartitionColumns(statics, regulars);
+        }
+
+        private static RowStats mergeStats(List<UnfilteredRowIterator> iterators)
+        {
+            RowStats stats = RowStats.NO_STATS;
+            for (UnfilteredRowIterator iter : iterators)
+                stats = stats.mergeWith(iter.stats());
+            return stats;
+        }
+
+        protected Unfiltered computeNext()
+        {
+            while (mergeIterator.hasNext())
+            {
+                Unfiltered merged = mergeIterator.next();
+                if (merged != null)
+                    return merged;
+            }
+            return endOfData();
+        }
+
+        public void close()
+        {
+            // This will close the input iterators
+            FileUtils.closeQuietly(mergeIterator);
+
+            if (listener != null)
+                listener.close();
+        }
+
+        /**
+         * Specific reducer for merge operations that rewrite the same reusable
+         * row every time. This also skip cells shadowed by range tombstones when writing.
+         */
+        private class MergeReducer extends MergeIterator.Reducer<Unfiltered, Unfiltered>
+        {
+            private Unfiltered.Kind nextKind;
+
+            private final Row.Merger rowMerger;
+            private final RangeTombstoneMarker.Merger markerMerger;
+
+            private MergeReducer(CFMetaData metadata, int size, boolean reversed, int nowInSec)
+            {
+                this.rowMerger = Row.Merger.createRegular(metadata, size, nowInSec, columns().regulars, listener);
+                this.markerMerger = new RangeTombstoneMarker.Merger(metadata, size, partitionLevelDeletion(), reversed, listener);
+            }
+
+            @Override
+            public boolean trivialReduceIsTrivial()
+            {
+                return listener == null;
+            }
+
+            public void reduce(int idx, Unfiltered current)
+            {
+                nextKind = current.kind();
+                if (nextKind == Unfiltered.Kind.ROW)
+                    rowMerger.add(idx, (Row)current);
+                else
+                    markerMerger.add(idx, (RangeTombstoneMarker)current);
+            }
+
+            protected Unfiltered getReduced()
+            {
+                return nextKind == Unfiltered.Kind.ROW
+                     ? rowMerger.merge(markerMerger.activeDeletion())
+                     : markerMerger.merge();
+            }
+
+            protected void onKeyChange()
+            {
+                if (nextKind == Unfiltered.Kind.ROW)
+                    rowMerger.clear();
+                else
+                    markerMerger.clear();
+            }
+        }
+    }
+
+    private static class FilteringIterator extends AbstractIterator<Row> implements RowIterator
+    {
+        private final UnfilteredRowIterator iter;
+        private final int nowInSec;
+        private final TombstoneFilteringRow filter;
+
+        public FilteringIterator(UnfilteredRowIterator iter, int nowInSec)
+        {
+            this.iter = iter;
+            this.nowInSec = nowInSec;
+            this.filter = new TombstoneFilteringRow(nowInSec);
+        }
+
+        public CFMetaData metadata()
+        {
+            return iter.metadata();
+        }
+
+        public boolean isReverseOrder()
+        {
+            return iter.isReverseOrder();
+        }
+
+        public PartitionColumns columns()
+        {
+            return iter.columns();
+        }
+
+        public DecoratedKey partitionKey()
+        {
+            return iter.partitionKey();
+        }
+
+        public Row staticRow()
+        {
+            Row row = iter.staticRow();
+            return row.isEmpty() ? row : new TombstoneFilteringRow(nowInSec).setTo(row);
+        }
+
+        protected Row computeNext()
+        {
+            while (iter.hasNext())
+            {
+                Unfiltered next = iter.next();
+                if (next.kind() != Unfiltered.Kind.ROW)
+                    continue;
+
+                Row row = filter.setTo((Row)next);
+                if (!row.isEmpty())
+                    return row;
+            }
+            return endOfData();
+        }
+
+        public void close()
+        {
+            iter.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
new file mode 100644
index 0000000..a5a0c75
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -0,0 +1,706 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.SearchIterator;
+
+/**
+ * Serialize/deserialize a single Unfiltered for the intra-node protocol.
+ *
+ * The encode format for an unfiltered is <flags>(<row>|<marker>) where:
+ *
+ *   <flags> is a byte whose bits are flags. The rightmost 1st bit is only
+ *       set to indicate the end of the partition. The 2nd bit indicates
+ *       whether the reminder is a range tombstone marker (otherwise it's a row).
+ *       If it's a row then the 3rd bit indicates if it's static, the 4th bit
+ *       indicates the presence of a row timestamp, the 5th the presence of a row
+ *       ttl, the 6th the presence of row deletion and the 7th indicates the
+ *       presence of complex deletion times.
+ *   <row> is <clustering>[<timestamp>][<ttl>][<deletion>]<sc1>...<sci><cc1>...<ccj> where
+ *       <clustering> is the row clustering as serialized by
+ *       {@code Clustering.serializer}. Note that static row are an exception and
+ *       don't have this. <timestamp>, <ttl> and <deletion> are the row timestamp, ttl and deletion
+ *       whose presence is determined by the flags. <sci> is the simple columns of the row and <ccj> the
+ *       complex ones.  There is actually 2 slightly different possible layout for those
+ *       cell: a dense one and a sparse one. Which one is used depends on the serialization
+ *       header and more precisely of {@link SerializationHeader.useSparseColumnLayout()}:
+ *         1) in the dense layout, there will be as many <sci> and <ccj> as there is columns
+ *            in the serialization header. *Each simple column <sci> will simply be a <cell>
+ *            (which might have no value, see below), while each <ccj> will be
+ *             [<delTime>]<cell1>...<celln><emptyCell> where <delTime> is the deletion for
+ *             this complex column (if flags indicates it present), <celln> are the <cell>
+ *             for this complex column and <emptyCell> is a last cell that will have no value
+ *             to indicate the end of this column.
+ *         2) in the sparse layout, there won't be "empty" cells, i.e. only the column that
+ *            actually have a cell are represented. For that, each <sci> and <ccj> start
+ *            by a 2 byte index that points to the column in the header it belongs to. After
+ *            that, each <sci> and <ccj> is the same than for the dense layout. But contrarily
+ *            to the dense layout we won't know how many elements are serialized so a 2 byte
+ *            marker with a value of -1 will indicates the end of the row.
+ *   <marker> is <bound><deletion> where <bound> is the marker bound as serialized
+ *       by {@code Slice.Bound.serializer} and <deletion> is the marker deletion
+ *       time.
+ *
+ *   <cell> A cell start with a 1 byte <flag>. Thre rightmost 1st bit indicates
+ *       if there is actually a value for this cell. If this flag is unset,
+ *       nothing more follows for the cell. The 2nd and third flag indicates if
+ *       it's a deleted or expiring cell. The 4th flag indicates if the value
+ *       is empty or not. The 5th and 6th indicates if the timestamp and ttl/
+ *       localDeletionTime for the cell are the same than the row one (if that
+ *       is the case, those are not repeated for the cell).Follows the <value>
+ *       (unless it's marked empty in the flag) and a delta-encoded long <timestamp>
+ *       (unless the flag tells to use the row level one).
+ *       Then if it's a deleted or expiring cell a delta-encoded int <localDelTime>
+ *       and if it's expiring a delta-encoded int <ttl> (unless it's an expiring cell
+ *       and the ttl and localDeletionTime are indicated by the flags to be the same
+ *       than the row ones, in which case none of those appears).
+ */
+public class UnfilteredSerializer
+{
+    private static final Logger logger = LoggerFactory.getLogger(UnfilteredSerializer.class);
+
+    public static final UnfilteredSerializer serializer = new UnfilteredSerializer();
+
+    // Unfiltered flags
+    private final static int END_OF_PARTITION     = 0x01;
+    private final static int IS_MARKER            = 0x02;
+    // For rows
+    private final static int IS_STATIC            = 0x04;
+    private final static int HAS_TIMESTAMP        = 0x08;
+    private final static int HAS_TTL              = 0x10;
+    private final static int HAS_DELETION         = 0x20;
+    private final static int HAS_COMPLEX_DELETION = 0x40;
+
+    // Cell flags
+    private final static int PRESENCE_MASK     = 0x01;
+    private final static int DELETION_MASK     = 0x02;
+    private final static int EXPIRATION_MASK   = 0x04;
+    private final static int EMPTY_VALUE_MASK  = 0x08;
+    private final static int USE_ROW_TIMESTAMP = 0x10;
+    private final static int USE_ROW_TTL       = 0x20;
+
+    public void serialize(Unfiltered unfiltered, SerializationHeader header, DataOutputPlus out, int version)
+    throws IOException
+    {
+        if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
+        {
+            serialize((RangeTombstoneMarker) unfiltered, header, out, version);
+        }
+        else
+        {
+            serialize((Row) unfiltered, header, out, version);
+        }
+    }
+
+    public void serialize(Row row, SerializationHeader header, DataOutputPlus out, int version)
+    throws IOException
+    {
+        int flags = 0;
+        boolean isStatic = row.isStatic();
+
+        LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
+        DeletionTime deletion = row.deletion();
+        boolean hasComplexDeletion = row.hasComplexDeletion();
+
+        if (isStatic)
+            flags |= IS_STATIC;
+        if (pkLiveness.hasTimestamp())
+            flags |= HAS_TIMESTAMP;
+        if (pkLiveness.hasTTL())
+            flags |= HAS_TTL;
+        if (!deletion.isLive())
+            flags |= HAS_DELETION;
+        if (hasComplexDeletion)
+            flags |= HAS_COMPLEX_DELETION;
+
+        out.writeByte((byte)flags);
+        if (!isStatic)
+            Clustering.serializer.serialize(row.clustering(), out, version, header.clusteringTypes());
+
+        if ((flags & HAS_TIMESTAMP) != 0)
+            out.writeLong(header.encodeTimestamp(pkLiveness.timestamp()));
+        if ((flags & HAS_TTL) != 0)
+        {
+            out.writeInt(header.encodeTTL(pkLiveness.ttl()));
+            out.writeInt(header.encodeDeletionTime(pkLiveness.localDeletionTime()));
+        }
+        if ((flags & HAS_DELETION) != 0)
+            UnfilteredRowIteratorSerializer.writeDelTime(deletion, header, out);
+
+        Columns columns = header.columns(isStatic);
+        int simpleCount = columns.simpleColumnCount();
+        boolean useSparse = header.useSparseColumnLayout(isStatic);
+        SearchIterator<ColumnDefinition, ColumnData> cells = row.searchIterator();
+
+        for (int i = 0; i < simpleCount; i++)
+            writeSimpleColumn(i, cells.next(columns.getSimple(i)), header, out, pkLiveness, useSparse);
+
+        for (int i = simpleCount; i < columns.columnCount(); i++)
+            writeComplexColumn(i, cells.next(columns.getComplex(i - simpleCount)), hasComplexDeletion, header, out, pkLiveness, useSparse);
+
+        if (useSparse)
+            out.writeShort(-1);
+    }
+
+    private void writeSimpleColumn(int idx, ColumnData data, SerializationHeader header, DataOutputPlus out, LivenessInfo rowLiveness, boolean useSparse)
+    throws IOException
+    {
+        if (useSparse)
+        {
+            if (data == null)
+                return;
+
+            out.writeShort(idx);
+        }
+
+        writeCell(data == null ? null : data.cell(), header, out, rowLiveness);
+    }
+
+    private void writeComplexColumn(int idx, ColumnData data, boolean hasComplexDeletion, SerializationHeader header, DataOutputPlus out, LivenessInfo rowLiveness, boolean useSparse)
+    throws IOException
+    {
+        Iterator<Cell> cells = data == null ? null : data.cells();
+        DeletionTime deletion = data == null ? DeletionTime.LIVE : data.complexDeletion();
+
+        if (useSparse)
+        {
+            assert hasComplexDeletion || deletion.isLive();
+            if (cells == null && deletion.isLive())
+                return;
+
+            out.writeShort(idx);
+        }
+
+        if (hasComplexDeletion)
+            UnfilteredRowIteratorSerializer.writeDelTime(deletion, header, out);
+
+        if (cells != null)
+            while (cells.hasNext())
+                writeCell(cells.next(), header, out, rowLiveness);
+
+        writeCell(null, header, out, rowLiveness);
+    }
+
+    public void serialize(RangeTombstoneMarker marker, SerializationHeader header, DataOutputPlus out, int version)
+    throws IOException
+    {
+        out.writeByte((byte)IS_MARKER);
+        RangeTombstone.Bound.serializer.serialize(marker.clustering(), out, version, header.clusteringTypes());
+
+        if (marker.isBoundary())
+        {
+            RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker;
+            UnfilteredRowIteratorSerializer.writeDelTime(bm.endDeletionTime(), header, out);
+            UnfilteredRowIteratorSerializer.writeDelTime(bm.startDeletionTime(), header, out);
+        }
+        else
+        {
+            UnfilteredRowIteratorSerializer.writeDelTime(((RangeTombstoneBoundMarker)marker).deletionTime(), header, out);
+        }
+    }
+
+    public long serializedSize(Unfiltered unfiltered, SerializationHeader header, int version, TypeSizes sizes)
+    {
+        return unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER
+             ? serializedSize((RangeTombstoneMarker) unfiltered, header, version, sizes)
+             : serializedSize((Row) unfiltered, header, version, sizes);
+    }
+
+    public long serializedSize(Row row, SerializationHeader header, int version, TypeSizes sizes)
+    {
+        long size = 1; // flags
+
+        boolean isStatic = row.isStatic();
+        LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
+        DeletionTime deletion = row.deletion();
+        boolean hasComplexDeletion = row.hasComplexDeletion();
+
+        if (!isStatic)
+            size += Clustering.serializer.serializedSize(row.clustering(), version, header.clusteringTypes(), sizes);
+
+        if (pkLiveness.hasTimestamp())
+            size += sizes.sizeof(header.encodeTimestamp(pkLiveness.timestamp()));
+        if (pkLiveness.hasTTL())
+        {
+            size += sizes.sizeof(header.encodeTTL(pkLiveness.ttl()));
+            size += sizes.sizeof(header.encodeDeletionTime(pkLiveness.localDeletionTime()));
+        }
+        if (!deletion.isLive())
+            size += UnfilteredRowIteratorSerializer.delTimeSerializedSize(deletion, header, sizes);
+
+        Columns columns = header.columns(isStatic);
+        int simpleCount = columns.simpleColumnCount();
+        boolean useSparse = header.useSparseColumnLayout(isStatic);
+        SearchIterator<ColumnDefinition, ColumnData> cells = row.searchIterator();
+
+        for (int i = 0; i < simpleCount; i++)
+            size += sizeOfSimpleColumn(i, cells.next(columns.getSimple(i)), header, sizes, pkLiveness, useSparse);
+
+        for (int i = simpleCount; i < columns.columnCount(); i++)
+            size += sizeOfComplexColumn(i, cells.next(columns.getComplex(i - simpleCount)), hasComplexDeletion, header, sizes, pkLiveness, useSparse);
+
+        if (useSparse)
+            size += sizes.sizeof((short)-1);
+
+        return size;
+    }
+
+    private long sizeOfSimpleColumn(int idx, ColumnData data, SerializationHeader header, TypeSizes sizes, LivenessInfo rowLiveness, boolean useSparse)
+    {
+        long size = 0;
+        if (useSparse)
+        {
+            if (data == null)
+                return size;
+
+            size += sizes.sizeof((short)idx);
+        }
+        return size + sizeOfCell(data == null ? null : data.cell(), header, sizes, rowLiveness);
+    }
+
+    private long sizeOfComplexColumn(int idx, ColumnData data, boolean hasComplexDeletion, SerializationHeader header, TypeSizes sizes, LivenessInfo rowLiveness, boolean useSparse)
+    {
+        long size = 0;
+        Iterator<Cell> cells = data == null ? null : data.cells();
+        DeletionTime deletion = data == null ? DeletionTime.LIVE : data.complexDeletion();
+        if (useSparse)
+        {
+            assert hasComplexDeletion || deletion.isLive();
+            if (cells == null && deletion.isLive())
+                return size;
+
+            size += sizes.sizeof((short)idx);
+        }
+
+        if (hasComplexDeletion)
+            size += UnfilteredRowIteratorSerializer.delTimeSerializedSize(deletion, header, sizes);
+
+        if (cells != null)
+            while (cells.hasNext())
+                size += sizeOfCell(cells.next(), header, sizes, rowLiveness);
+
+        return size + sizeOfCell(null, header, sizes, rowLiveness);
+    }
+
+    public long serializedSize(RangeTombstoneMarker marker, SerializationHeader header, int version, TypeSizes sizes)
+    {
+        long size = 1 // flags
+                  + RangeTombstone.Bound.serializer.serializedSize(marker.clustering(), version, header.clusteringTypes(), sizes);
+
+        if (marker.isBoundary())
+        {
+            RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker;
+            size += UnfilteredRowIteratorSerializer.delTimeSerializedSize(bm.endDeletionTime(), header, sizes);
+            size += UnfilteredRowIteratorSerializer.delTimeSerializedSize(bm.startDeletionTime(), header, sizes);
+        }
+        else
+        {
+           size += UnfilteredRowIteratorSerializer.delTimeSerializedSize(((RangeTombstoneBoundMarker)marker).deletionTime(), header, sizes);
+        }
+        return size;
+    }
+
+    public void writeEndOfPartition(DataOutputPlus out) throws IOException
+    {
+        out.writeByte((byte)1);
+    }
+
+    public long serializedSizeEndOfPartition(TypeSizes sizes)
+    {
+        return 1;
+    }
+
+    public Unfiltered.Kind deserialize(DataInput in,
+                                 SerializationHeader header,
+                                 SerializationHelper helper,
+                                 Row.Writer rowWriter,
+                                 RangeTombstoneMarker.Writer markerWriter)
+    throws IOException
+    {
+        int flags = in.readUnsignedByte();
+        if (isEndOfPartition(flags))
+            return null;
+
+        if (kind(flags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
+        {
+            RangeTombstone.Bound.Kind kind = RangeTombstone.Bound.serializer.deserialize(in, helper.version, header.clusteringTypes(), markerWriter);
+            deserializeMarkerBody(in, header, kind.isBoundary(), markerWriter);
+            return Unfiltered.Kind.RANGE_TOMBSTONE_MARKER;
+        }
+        else
+        {
+            assert !isStatic(flags); // deserializeStaticRow should be used for that.
+            Clustering.serializer.deserialize(in, helper.version, header.clusteringTypes(), rowWriter);
+            deserializeRowBody(in, header, helper, flags, rowWriter);
+            return Unfiltered.Kind.ROW;
+        }
+    }
+
+    public Row deserializeStaticRow(DataInput in, SerializationHeader header, SerializationHelper helper)
+    throws IOException
+    {
+        int flags = in.readUnsignedByte();
+        assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW && isStatic(flags);
+        StaticRow.Builder builder = StaticRow.builder(header.columns().statics, true, header.columns().statics.hasCounters());
+        deserializeRowBody(in, header, helper, flags, builder);
+        return builder.build();
+    }
+
+    public void skipStaticRow(DataInput in, SerializationHeader header, SerializationHelper helper) throws IOException
+    {
+        int flags = in.readUnsignedByte();
+        assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW && isStatic(flags) : "Flags is " + flags;
+        skipRowBody(in, header, helper, flags);
+    }
+
+    public void deserializeMarkerBody(DataInput in,
+                                      SerializationHeader header,
+                                      boolean isBoundary,
+                                      RangeTombstoneMarker.Writer writer)
+    throws IOException
+    {
+        if (isBoundary)
+            writer.writeBoundaryDeletion(UnfilteredRowIteratorSerializer.readDelTime(in, header), UnfilteredRowIteratorSerializer.readDelTime(in, header));
+        else
+            writer.writeBoundDeletion(UnfilteredRowIteratorSerializer.readDelTime(in, header));
+        writer.endOfMarker();
+    }
+
+    public void skipMarkerBody(DataInput in, SerializationHeader header, boolean isBoundary) throws IOException
+    {
+        if (isBoundary)
+        {
+            UnfilteredRowIteratorSerializer.skipDelTime(in, header);
+            UnfilteredRowIteratorSerializer.skipDelTime(in, header);
+        }
+        else
+        {
+            UnfilteredRowIteratorSerializer.skipDelTime(in, header);
+        }
+    }
+
+    public void deserializeRowBody(DataInput in,
+                                   SerializationHeader header,
+                                   SerializationHelper helper,
+                                   int flags,
+                                   Row.Writer writer)
+    throws IOException
+    {
+        boolean isStatic = isStatic(flags);
+        boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
+        boolean hasTTL = (flags & HAS_TTL) != 0;
+        boolean hasDeletion = (flags & HAS_DELETION) != 0;
+        boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0;
+
+        long timestamp = hasTimestamp ? header.decodeTimestamp(in.readLong()) : LivenessInfo.NO_TIMESTAMP;
+        int ttl = hasTTL ? header.decodeTTL(in.readInt()) : LivenessInfo.NO_TTL;
+        int localDeletionTime = hasTTL ? header.decodeDeletionTime(in.readInt()) : LivenessInfo.NO_DELETION_TIME;
+        DeletionTime deletion = hasDeletion ? UnfilteredRowIteratorSerializer.readDelTime(in, header) : DeletionTime.LIVE;
+
+        helper.writePartitionKeyLivenessInfo(writer, timestamp, ttl, localDeletionTime);
+        writer.writeRowDeletion(deletion);
+
+        Columns columns = header.columns(isStatic);
+        if (header.useSparseColumnLayout(isStatic))
+        {
+            int count = columns.columnCount();
+            int simpleCount = columns.simpleColumnCount();
+            int i;
+            while ((i = in.readShort()) >= 0)
+            {
+                if (i > count)
+                    throw new IOException(String.format("Impossible column index %d, the header has only %d columns defined", i, count));
+
+                if (i < simpleCount)
+                    readSimpleColumn(columns.getSimple(i), in, header, helper, writer);
+                else
+                    readComplexColumn(columns.getComplex(i - simpleCount), in, header, helper, hasComplexDeletion, writer);
+            }
+        }
+        else
+        {
+            for (int i = 0; i < columns.simpleColumnCount(); i++)
+                readSimpleColumn(columns.getSimple(i), in, header, helper, writer);
+
+            for (int i = 0; i < columns.complexColumnCount(); i++)
+                readComplexColumn(columns.getComplex(i), in, header, helper, hasComplexDeletion, writer);
+        }
+
+        writer.endOfRow();
+    }
+
+    private void readSimpleColumn(ColumnDefinition column, DataInput in, SerializationHeader header, SerializationHelper helper, Row.Writer writer)
+    throws IOException
+    {
+        if (helper.includes(column))
+            readCell(column, in, header, helper, writer);
+        else
+            skipCell(column, in, header);
+    }
+
+    private void readComplexColumn(ColumnDefinition column, DataInput in, SerializationHeader header, SerializationHelper helper, boolean hasComplexDeletion, Row.Writer writer)
+    throws IOException
+    {
+        if (helper.includes(column))
+        {
+            helper.startOfComplexColumn(column);
+
+            if (hasComplexDeletion)
+                writer.writeComplexDeletion(column, UnfilteredRowIteratorSerializer.readDelTime(in, header));
+
+            while (readCell(column, in, header, helper, writer));
+
+            helper.endOfComplexColumn(column);
+        }
+        else
+        {
+            skipComplexColumn(column, in, header, helper, hasComplexDeletion);
+        }
+    }
+
+    public void skipRowBody(DataInput in, SerializationHeader header, SerializationHelper helper, int flags) throws IOException
+    {
+        boolean isStatic = isStatic(flags);
+        boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
+        boolean hasTTL = (flags & HAS_TTL) != 0;
+        boolean hasDeletion = (flags & HAS_DELETION) != 0;
+        boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0;
+
+        // Note that we don't want want to use FileUtils.skipBytesFully for anything that may not have
+        // the size we think due to VINT encoding
+        if (hasTimestamp)
+            in.readLong();
+        if (hasTTL)
+        {
+            // ttl and localDeletionTime
+            in.readInt();
+            in.readInt();
+        }
+        if (hasDeletion)
+            UnfilteredRowIteratorSerializer.skipDelTime(in, header);
+
+        Columns columns = header.columns(isStatic);
+        if (header.useSparseColumnLayout(isStatic))
+        {
+            int count = columns.columnCount();
+            int simpleCount = columns.simpleColumnCount();
+            int i;
+            while ((i = in.readShort()) >= 0)
+            {
+                if (i > count)
+                    throw new IOException(String.format("Impossible column index %d, the header has only %d columns defined", i, count));
+
+                if (i < simpleCount)
+                    skipCell(columns.getSimple(i), in, header);
+                else
+                    skipComplexColumn(columns.getComplex(i - simpleCount), in, header, helper, hasComplexDeletion);
+            }
+        }
+        else
+        {
+            for (int i = 0; i < columns.simpleColumnCount(); i++)
+                skipCell(columns.getSimple(i), in, header);
+
+            for (int i = 0; i < columns.complexColumnCount(); i++)
+                skipComplexColumn(columns.getComplex(i), in, header, helper, hasComplexDeletion);
+        }
+    }
+
+    private void skipComplexColumn(ColumnDefinition column, DataInput in, SerializationHeader header, SerializationHelper helper, boolean hasComplexDeletion)
+    throws IOException
+    {
+        if (hasComplexDeletion)
+            UnfilteredRowIteratorSerializer.skipDelTime(in, header);
+
+        while (skipCell(column, in, header));
+    }
+
+    public static boolean isEndOfPartition(int flags)
+    {
+        return (flags & END_OF_PARTITION) != 0;
+    }
+
+    public static Unfiltered.Kind kind(int flags)
+    {
+        return (flags & IS_MARKER) != 0 ? Unfiltered.Kind.RANGE_TOMBSTONE_MARKER : Unfiltered.Kind.ROW;
+    }
+
+    public static boolean isStatic(int flags)
+    {
+        return (flags & IS_MARKER) == 0 && (flags & IS_STATIC) != 0;
+    }
+
+    private void writeCell(Cell cell, SerializationHeader header, DataOutputPlus out, LivenessInfo rowLiveness)
+    throws IOException
+    {
+        if (cell == null)
+        {
+            out.writeByte((byte)0);
+            return;
+        }
+
+        boolean hasValue = cell.value().hasRemaining();
+        boolean isDeleted = cell.isTombstone();
+        boolean isExpiring = cell.isExpiring();
+        boolean useRowTimestamp = rowLiveness.hasTimestamp() && cell.livenessInfo().timestamp() == rowLiveness.timestamp();
+        boolean useRowTTL = isExpiring && rowLiveness.hasTTL() && cell.livenessInfo().ttl() == rowLiveness.ttl() && cell.livenessInfo().localDeletionTime() == rowLiveness.localDeletionTime();
+        int flags = PRESENCE_MASK;
+        if (!hasValue)
+            flags |= EMPTY_VALUE_MASK;
+
+        if (isDeleted)
+            flags |= DELETION_MASK;
+        else if (isExpiring)
+            flags |= EXPIRATION_MASK;
+
+        if (useRowTimestamp)
+            flags |= USE_ROW_TIMESTAMP;
+        if (useRowTTL)
+            flags |= USE_ROW_TTL;
+
+        out.writeByte((byte)flags);
+
+        if (hasValue)
+            header.getType(cell.column()).writeValue(cell.value(), out);
+
+        if (!useRowTimestamp)
+            out.writeLong(header.encodeTimestamp(cell.livenessInfo().timestamp()));
+
+        if ((isDeleted || isExpiring) && !useRowTTL)
+            out.writeInt(header.encodeDeletionTime(cell.livenessInfo().localDeletionTime()));
+        if (isExpiring && !useRowTTL)
+            out.writeInt(header.encodeTTL(cell.livenessInfo().ttl()));
+
+        if (cell.column().isComplex())
+            cell.column().cellPathSerializer().serialize(cell.path(), out);
+    }
+
+    private long sizeOfCell(Cell cell, SerializationHeader header, TypeSizes sizes, LivenessInfo rowLiveness)
+    {
+        long size = 1; // flags
+
+        if (cell == null)
+            return size;
+
+        boolean hasValue = cell.value().hasRemaining();
+        boolean isDeleted = cell.isTombstone();
+        boolean isExpiring = cell.isExpiring();
+        boolean useRowTimestamp = rowLiveness.hasTimestamp() && cell.livenessInfo().timestamp() == rowLiveness.timestamp();
+        boolean useRowTTL = isExpiring && rowLiveness.hasTTL() && cell.livenessInfo().ttl() == rowLiveness.ttl() && cell.livenessInfo().localDeletionTime() == rowLiveness.localDeletionTime();
+
+        if (hasValue)
+            size += header.getType(cell.column()).writtenLength(cell.value(), sizes);
+
+        if (!useRowTimestamp)
+            size += sizes.sizeof(header.encodeTimestamp(cell.livenessInfo().timestamp()));
+
+        if ((isDeleted || isExpiring) && !useRowTTL)
+            size += sizes.sizeof(header.encodeDeletionTime(cell.livenessInfo().localDeletionTime()));
+        if (isExpiring && !useRowTTL)
+            size += sizes.sizeof(header.encodeTTL(cell.livenessInfo().ttl()));
+
+        if (cell.column().isComplex())
+            size += cell.column().cellPathSerializer().serializedSize(cell.path(), sizes);
+
+        return size;
+    }
+
+    private boolean readCell(ColumnDefinition column, DataInput in, SerializationHeader header, SerializationHelper helper, Row.Writer writer)
+    throws IOException
+    {
+        int flags = in.readUnsignedByte();
+        if ((flags & PRESENCE_MASK) == 0)
+            return false;
+
+        boolean hasValue = (flags & EMPTY_VALUE_MASK) == 0;
+        boolean isDeleted = (flags & DELETION_MASK) != 0;
+        boolean isExpiring = (flags & EXPIRATION_MASK) != 0;
+        boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP) != 0;
+        boolean useRowTTL = (flags & USE_ROW_TTL) != 0;
+
+        ByteBuffer value = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+        if (hasValue)
+        {
+            if (helper.canSkipValue(column))
+                header.getType(column).skipValue(in);
+            else
+                value = header.getType(column).readValue(in);
+        }
+
+        long timestamp = useRowTimestamp ? helper.getRowTimestamp() : header.decodeTimestamp(in.readLong());
+
+        int localDelTime = useRowTTL
+                         ? helper.getRowLocalDeletionTime()
+                         : (isDeleted || isExpiring ? header.decodeDeletionTime(in.readInt()) : LivenessInfo.NO_DELETION_TIME);
+
+        int ttl = useRowTTL
+                ? helper.getRowTTL()
+                : (isExpiring ? header.decodeTTL(in.readInt()) : LivenessInfo.NO_TTL);
+
+        CellPath path = column.isComplex()
+                      ? column.cellPathSerializer().deserialize(in)
+                      : null;
+
+        helper.writeCell(writer, column, false, value, timestamp, localDelTime, ttl, path);
+
+        return true;
+    }
+
+    private boolean skipCell(ColumnDefinition column, DataInput in, SerializationHeader header)
+    throws IOException
+    {
+        int flags = in.readUnsignedByte();
+        if ((flags & PRESENCE_MASK) == 0)
+            return false;
+
+        boolean hasValue = (flags & EMPTY_VALUE_MASK) == 0;
+        boolean isDeleted = (flags & DELETION_MASK) != 0;
+        boolean isExpiring = (flags & EXPIRATION_MASK) != 0;
+        boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP) != 0;
+        boolean useRowTTL = (flags & USE_ROW_TTL) != 0;
+
+        if (hasValue)
+            header.getType(column).skipValue(in);
+
+        if (!useRowTimestamp)
+            in.readLong();
+
+        if (!useRowTTL && (isDeleted || isExpiring))
+            in.readInt();
+
+        if (!useRowTTL && isExpiring)
+            in.readInt();
+
+        if (column.isComplex())
+            column.cellPathSerializer().skip(in);
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/WrappingRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/WrappingRow.java b/src/java/org/apache/cassandra/db/rows/WrappingRow.java
new file mode 100644
index 0000000..5a0cc78
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/WrappingRow.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.SearchIterator;
+
+public abstract class WrappingRow extends AbstractRow
+{
+    protected Row wrapped;
+
+    private final ReusableIterator cellIterator = new ReusableIterator();
+    private final ReusableSearchIterator cellSearchIterator = new ReusableSearchIterator();
+
+    /**
+     * Apply some filtering/transformation on cells. This function
+     * can return {@code null} in which case the cell will be ignored.
+     */
+    protected abstract Cell filterCell(Cell cell);
+
+    protected DeletionTime filterDeletionTime(DeletionTime deletionTime)
+    {
+        return deletionTime;
+    }
+
+    protected ColumnData filterColumnData(ColumnData data)
+    {
+        if (data.column().isComplex())
+        {
+            Iterator<Cell> cells = cellIterator.setTo(data.cells());
+            DeletionTime dt = filterDeletionTime(data.complexDeletion());
+            return cells == null && dt.isLive()
+                 ? null
+                 : new ColumnData(data.column(), null, cells == null ? Collections.emptyIterator(): cells, dt);
+        }
+        else
+        {
+            Cell filtered = filterCell(data.cell());
+            return filtered == null ? null : new ColumnData(data.column(), filtered, null, null);
+        }
+    }
+
+    public WrappingRow setTo(Row row)
+    {
+        this.wrapped = row;
+        return this;
+    }
+
+    public Unfiltered.Kind kind()
+    {
+        return Unfiltered.Kind.ROW;
+    }
+
+    public Clustering clustering()
+    {
+        return wrapped.clustering();
+    }
+
+    public Columns columns()
+    {
+        return wrapped.columns();
+    }
+
+    public LivenessInfo primaryKeyLivenessInfo()
+    {
+        return wrapped.primaryKeyLivenessInfo();
+    }
+
+    public DeletionTime deletion()
+    {
+        return wrapped.deletion();
+    }
+
+    public boolean hasComplexDeletion()
+    {
+        // Note that because cells can be filtered out/transformed through
+        // filterCell(), we can't rely on wrapped.hasComplexDeletion().
+        for (int i = 0; i < columns().complexColumnCount(); i++)
+            if (!getDeletion(columns().getComplex(i)).isLive())
+                return true;
+        return false;
+    }
+
+    public Cell getCell(ColumnDefinition c)
+    {
+        Cell cell = wrapped.getCell(c);
+        return cell == null ? null : filterCell(cell);
+    }
+
+    public Cell getCell(ColumnDefinition c, CellPath path)
+    {
+        Cell cell = wrapped.getCell(c, path);
+        return cell == null ? null : filterCell(cell);
+    }
+
+    public Iterator<Cell> getCells(ColumnDefinition c)
+    {
+        Iterator<Cell> cells = wrapped.getCells(c);
+        if (cells == null)
+            return null;
+
+        cellIterator.setTo(cells);
+        return cellIterator.hasNext() ? cellIterator : null;
+    }
+
+    public DeletionTime getDeletion(ColumnDefinition c)
+    {
+        return filterDeletionTime(wrapped.getDeletion(c));
+    }
+
+    public Iterator<Cell> iterator()
+    {
+        return cellIterator.setTo(wrapped.iterator());
+    }
+
+    public SearchIterator<ColumnDefinition, ColumnData> searchIterator()
+    {
+        return cellSearchIterator.setTo(wrapped.searchIterator());
+    }
+
+    public Row takeAlias()
+    {
+        boolean isCounter = columns().hasCounters();
+        if (isStatic())
+        {
+            StaticRow.Builder builder = StaticRow.builder(columns(), true, isCounter);
+            copyTo(builder);
+            return builder.build();
+        }
+        else
+        {
+            ReusableRow copy = new ReusableRow(clustering().size(), columns(), true, isCounter);
+            copyTo(copy.writer());
+            return copy;
+        }
+    }
+
+    private class ReusableIterator extends UnmodifiableIterator<Cell>
+    {
+        private Iterator<Cell> iter;
+        private Cell next;
+
+        public ReusableIterator setTo(Iterator<Cell> iter)
+        {
+            this.iter = iter;
+            this.next = null;
+            return this;
+        }
+
+        public boolean hasNext()
+        {
+            while (next == null && iter.hasNext())
+                next = filterCell(iter.next());
+            return next != null;
+        }
+
+        public Cell next()
+        {
+            if (next == null && !hasNext())
+                throw new NoSuchElementException();
+
+            Cell result = next;
+            next = null;
+            return result;
+        }
+    };
+
+    private class ReusableSearchIterator implements SearchIterator<ColumnDefinition, ColumnData>
+    {
+        private SearchIterator<ColumnDefinition, ColumnData> iter;
+
+        public ReusableSearchIterator setTo(SearchIterator<ColumnDefinition, ColumnData> iter)
+        {
+            this.iter = iter;
+            return this;
+        }
+
+        public boolean hasNext()
+        {
+            return iter.hasNext();
+        }
+
+        public ColumnData next(ColumnDefinition column)
+        {
+            ColumnData data = iter.next(column);
+            if (data == null)
+                return null;
+
+            return filterColumnData(data);
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/WrappingRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/WrappingRowIterator.java b/src/java/org/apache/cassandra/db/rows/WrappingRowIterator.java
new file mode 100644
index 0000000..8847a47
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/WrappingRowIterator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+
+/**
+ * Abstract class to make writing atom iterators that wrap another iterator
+ * easier. By default, the wrapping iterator simply delegate every call to
+ * the wrapped iterator so concrete implementations will override some of the
+ * methods.
+ */
+public abstract class WrappingRowIterator extends UnmodifiableIterator<Row>  implements RowIterator
+{
+    protected final RowIterator wrapped;
+
+    protected WrappingRowIterator(RowIterator wrapped)
+    {
+        this.wrapped = wrapped;
+    }
+
+    public CFMetaData metadata()
+    {
+        return wrapped.metadata();
+    }
+
+    public boolean isReverseOrder()
+    {
+        return wrapped.isReverseOrder();
+    }
+
+    public PartitionColumns columns()
+    {
+        return wrapped.columns();
+    }
+
+    public DecoratedKey partitionKey()
+    {
+        return wrapped.partitionKey();
+    }
+
+    public Row staticRow()
+    {
+        return wrapped.staticRow();
+    }
+
+    public boolean hasNext()
+    {
+        return wrapped.hasNext();
+    }
+
+    public Row next()
+    {
+        return wrapped.next();
+    }
+
+    public void close()
+    {
+        wrapped.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java
new file mode 100644
index 0000000..680e502
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+
+/**
+ * Abstract class to make writing atom iterators that wrap another iterator
+ * easier. By default, the wrapping iterator simply delegate every call to
+ * the wrapped iterator so concrete implementations will override some of the
+ * methods.
+ */
+public abstract class WrappingUnfilteredRowIterator extends UnmodifiableIterator<Unfiltered>  implements UnfilteredRowIterator
+{
+    protected final UnfilteredRowIterator wrapped;
+
+    protected WrappingUnfilteredRowIterator(UnfilteredRowIterator wrapped)
+    {
+        this.wrapped = wrapped;
+    }
+
+    public CFMetaData metadata()
+    {
+        return wrapped.metadata();
+    }
+
+    public PartitionColumns columns()
+    {
+        return wrapped.columns();
+    }
+
+    public boolean isReverseOrder()
+    {
+        return wrapped.isReverseOrder();
+    }
+
+    public DecoratedKey partitionKey()
+    {
+        return wrapped.partitionKey();
+    }
+
+    public DeletionTime partitionLevelDeletion()
+    {
+        return wrapped.partitionLevelDeletion();
+    }
+
+    public Row staticRow()
+    {
+        return wrapped.staticRow();
+    }
+
+    public boolean hasNext()
+    {
+        return wrapped.hasNext();
+    }
+
+    public Unfiltered next()
+    {
+        return wrapped.next();
+    }
+
+    public RowStats stats()
+    {
+        return wrapped.stats();
+    }
+
+    public void close()
+    {
+        wrapped.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/dht/AbstractBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/AbstractBounds.java b/src/java/org/apache/cassandra/dht/AbstractBounds.java
index b5ffc22..e295c68 100644
--- a/src/java/org/apache/cassandra/dht/AbstractBounds.java
+++ b/src/java/org/apache/cassandra/dht/AbstractBounds.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
 import java.util.List;
 
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -34,8 +34,8 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria
     private static final long serialVersionUID = 1L;
     public static final IPartitionerDependentSerializer<AbstractBounds<Token>> tokenSerializer =
             new AbstractBoundsSerializer<Token>(Token.serializer);
-    public static final IPartitionerDependentSerializer<AbstractBounds<RowPosition>> rowPositionSerializer =
-            new AbstractBoundsSerializer<RowPosition>(RowPosition.serializer);
+    public static final IPartitionerDependentSerializer<AbstractBounds<PartitionPosition>> rowPositionSerializer =
+            new AbstractBoundsSerializer<PartitionPosition>(PartitionPosition.serializer);
 
     private enum Type
     {
@@ -112,6 +112,9 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria
     protected abstract String getOpeningString();
     protected abstract String getClosingString();
 
+    public abstract boolean isStartInclusive();
+    public abstract boolean isEndInclusive();
+
     public abstract AbstractBounds<T> withNewRight(T newRight);
 
     public static class AbstractBoundsSerializer<T extends RingPosition<T>> implements IPartitionerDependentSerializer<AbstractBounds<T>>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/dht/Bounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Bounds.java b/src/java/org/apache/cassandra/dht/Bounds.java
index 4a5a701..b569349 100644
--- a/src/java/org/apache/cassandra/dht/Bounds.java
+++ b/src/java/org/apache/cassandra/dht/Bounds.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.dht;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -102,12 +102,22 @@ public class Bounds<T extends RingPosition<T>> extends AbstractBounds<T>
         return "]";
     }
 
+    public boolean isStartInclusive()
+    {
+        return true;
+    }
+
+    public boolean isEndInclusive()
+    {
+        return true;
+    }
+
     /**
      * Compute a bounds of keys corresponding to a given bounds of token.
      */
-    public static Bounds<RowPosition> makeRowBounds(Token left, Token right)
+    public static Bounds<PartitionPosition> makeRowBounds(Token left, Token right)
     {
-        return new Bounds<RowPosition>(left.minKeyBound(), right.maxKeyBound());
+        return new Bounds<PartitionPosition>(left.minKeyBound(), right.maxKeyBound());
     }
 
     public AbstractBounds<T> withNewRight(T newRight)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/dht/ExcludingBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/ExcludingBounds.java b/src/java/org/apache/cassandra/dht/ExcludingBounds.java
index 0d37e5c..86af68d 100644
--- a/src/java/org/apache/cassandra/dht/ExcludingBounds.java
+++ b/src/java/org/apache/cassandra/dht/ExcludingBounds.java
@@ -90,6 +90,16 @@ public class ExcludingBounds<T extends RingPosition<T>> extends AbstractBounds<T
         return ")";
     }
 
+    public boolean isStartInclusive()
+    {
+        return false;
+    }
+
+    public boolean isEndInclusive()
+    {
+        return false;
+    }
+
     public AbstractBounds<T> withNewRight(T newRight)
     {
         return new ExcludingBounds<T>(left, newRight);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java b/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
index e9e8e8e..446d0af 100644
--- a/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
+++ b/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
@@ -89,6 +89,16 @@ public class IncludingExcludingBounds<T extends RingPosition<T>> extends Abstrac
         return ")";
     }
 
+    public boolean isStartInclusive()
+    {
+        return true;
+    }
+
+    public boolean isEndInclusive()
+    {
+        return false;
+    }
+
     public AbstractBounds<T> withNewRight(T newRight)
     {
         return new IncludingExcludingBounds<T>(left, newRight);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/dht/Range.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Range.java b/src/java/org/apache/cassandra/dht/Range.java
index cbf093c..f99716b 100644
--- a/src/java/org/apache/cassandra/dht/Range.java
+++ b/src/java/org/apache/cassandra/dht/Range.java
@@ -21,7 +21,8 @@ import java.io.Serializable;
 import java.util.*;
 
 import org.apache.commons.lang3.ObjectUtils;
-import org.apache.cassandra.db.RowPosition;
+
+import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -372,6 +373,16 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen
         return "]";
     }
 
+    public boolean isStartInclusive()
+    {
+        return false;
+    }
+
+    public boolean isEndInclusive()
+    {
+        return true;
+    }
+
     public List<String> asList()
     {
         ArrayList<String> ret = new ArrayList<String>(2);
@@ -465,12 +476,12 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen
     /**
      * Compute a range of keys corresponding to a given range of token.
      */
-    public static Range<RowPosition> makeRowRange(Token left, Token right)
+    public static Range<PartitionPosition> makeRowRange(Token left, Token right)
     {
-        return new Range<RowPosition>(left.maxKeyBound(), right.maxKeyBound());
+        return new Range<PartitionPosition>(left.maxKeyBound(), right.maxKeyBound());
     }
 
-    public static Range<RowPosition> makeRowRange(Range<Token> tokenBounds)
+    public static Range<PartitionPosition> makeRowRange(Range<Token> tokenBounds)
     {
         return makeRowRange(tokenBounds.left, tokenBounds.right);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/dht/Token.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Token.java b/src/java/org/apache/cassandra/dht/Token.java
index 0cc8a2d..c87b46b 100644
--- a/src/java/org/apache/cassandra/dht/Token.java
+++ b/src/java/org/apache/cassandra/dht/Token.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -142,7 +142,7 @@ public abstract class Token implements RingPosition<Token>, Serializable
             return (R)maxKeyBound();
     }
 
-    public static class KeyBound implements RowPosition
+    public static class KeyBound implements PartitionPosition
     {
         private final Token token;
         public final boolean isMinimumBound;
@@ -158,7 +158,7 @@ public abstract class Token implements RingPosition<Token>, Serializable
             return token;
         }
 
-        public int compareTo(RowPosition pos)
+        public int compareTo(PartitionPosition pos)
         {
             if (this == pos)
                 return 0;
@@ -188,9 +188,9 @@ public abstract class Token implements RingPosition<Token>, Serializable
             return getToken().isMinimum();
         }
 
-        public RowPosition.Kind kind()
+        public PartitionPosition.Kind kind()
         {
-            return isMinimumBound ? RowPosition.Kind.MIN_BOUND : RowPosition.Kind.MAX_BOUND;
+            return isMinimumBound ? PartitionPosition.Kind.MIN_BOUND : PartitionPosition.Kind.MAX_BOUND;
         }
 
         @Override