You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:46 UTC

[22/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
new file mode 100644
index 0000000..ca1e424
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -0,0 +1,764 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Sorting;
+
+/**
+ * Stores updates made on a partition.
+ * <p>
+ * A PartitionUpdate object requires that all writes are performed before we
+ * try to read the updates (attempts to write to the PartitionUpdate after a
+ * read method has been called will result in an exception being thrown).
+ * In other words, a Partition is mutable while we do a write and become
+ * immutable as soon as it is read.
+ * <p>
+ * Row updates are added to this update through the {@link #writer} method which
+ * returns a {@link Row.Writer}. Multiple rows can be added to this writer as required and
+ * those row do not have to added in (clustering) order, and the same row can be added
+ * multiple times. Further, for a given row, the writer actually supports intermingling
+ * the writing of cells for different complex cells (note that this is usually not supported
+ * by {@code Row.Writer} implementations, but is supported here because
+ * {@code ModificationStatement} requires that (because we could have multiple {@link Operation}
+ * on the same column in a given statement)).
+ */
+public class PartitionUpdate extends AbstractPartitionData implements Sorting.Sortable
+{
+    protected static final Logger logger = LoggerFactory.getLogger(PartitionUpdate.class);
+
+    // Records whether the partition update has been sorted (it is the rows contained in the partition
+    // that are sorted since we don't require rows to be added in order). Sorting happens when the
+    // update is read, and writting is rejected as soon as the update is sorted (it's actually possible
+    // to manually allow new update by using allowNewUpdates(), and we could make that more implicit, but
+    // as only triggers really requires it, we keep it simple for now).
+    private boolean isSorted;
+
+    public static final PartitionUpdateSerializer serializer = new PartitionUpdateSerializer();
+
+    private final Writer writer;
+
+    // Used by compare for the sake of implementing the Sorting.Sortable interface (which is in turn used
+    // to sort the rows of this update).
+    private final InternalReusableClustering p1 = new InternalReusableClustering();
+    private final InternalReusableClustering p2 = new InternalReusableClustering();
+
+    private PartitionUpdate(CFMetaData metadata,
+                            DecoratedKey key,
+                            DeletionInfo delInfo,
+                            RowDataBlock data,
+                            PartitionColumns columns,
+                            int initialRowCapacity)
+    {
+        super(metadata, key, delInfo, columns, data, initialRowCapacity);
+        this.writer = createWriter();
+    }
+
+    public PartitionUpdate(CFMetaData metadata,
+                           DecoratedKey key,
+                           DeletionInfo delInfo,
+                           PartitionColumns columns,
+                           int initialRowCapacity)
+    {
+        this(metadata,
+             key,
+             delInfo,
+             new RowDataBlock(columns.regulars, initialRowCapacity, true, metadata.isCounter()),
+             columns,
+             initialRowCapacity);
+    }
+
+    public PartitionUpdate(CFMetaData metadata,
+                           DecoratedKey key,
+                           PartitionColumns columns,
+                           int initialRowCapacity)
+    {
+        this(metadata,
+             key,
+             DeletionInfo.live(),
+             columns,
+             initialRowCapacity);
+    }
+
+    protected Writer createWriter()
+    {
+        return new RegularWriter();
+    }
+
+    protected StaticWriter createStaticWriter()
+    {
+        return new StaticWriter();
+    }
+
+    /**
+     * Deserialize a partition update from a provided byte buffer.
+     *
+     * @param bytes the byte buffer that contains the serialized update.
+     * @param version the version with which the update is serialized.
+     * @param key the partition key for the update. This is only used if {@code version &lt 3.0}
+     * and can be {@code null} otherwise.
+     *
+     * @return the deserialized update or {@code null} if {@code bytes == null}.
+     */
+    public static PartitionUpdate fromBytes(ByteBuffer bytes, int version, DecoratedKey key)
+    {
+        if (bytes == null)
+            return null;
+
+        try
+        {
+            return serializer.deserialize(new DataInputStream(ByteBufferUtil.inputStream(bytes)),
+                                          version,
+                                          SerializationHelper.Flag.LOCAL,
+                                          version < MessagingService.VERSION_30 ? key : null);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Serialize a partition update as a byte buffer.
+     *
+     * @param update the partition update to serialize.
+     * @param version the version to serialize the update into.
+     *
+     * @return a newly allocated byte buffer containing the serialized update.
+     */
+    public static ByteBuffer toBytes(PartitionUpdate update, int version)
+    {
+        try (DataOutputBuffer out = new DataOutputBuffer())
+        {
+            serializer.serialize(update, out, MessagingService.current_version);
+            return ByteBuffer.wrap(out.getData(), 0, out.getLength());
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Creates a empty immutable partition update.
+     *
+     * @param metadata the metadata for the created update.
+     * @param key the partition key for the created update.
+     *
+     * @return the newly created empty (and immutable) update.
+     */
+    public static PartitionUpdate emptyUpdate(CFMetaData metadata, DecoratedKey key)
+    {
+        return new PartitionUpdate(metadata, key, PartitionColumns.NONE, 0)
+        {
+            public Row.Writer staticWriter()
+            {
+                throw new UnsupportedOperationException();
+            }
+
+            public Row.Writer writer()
+            {
+                throw new UnsupportedOperationException();
+            }
+
+            public void addPartitionDeletion(DeletionTime deletionTime)
+            {
+                throw new UnsupportedOperationException();
+            }
+
+            public void addRangeTombstone(RangeTombstone range)
+            {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    /**
+     * Creates a partition update that entirely deletes a given partition.
+     *
+     * @param metadata the metadata for the created update.
+     * @param key the partition key for the partition that the created update should delete.
+     * @param timestamp the timestamp for the deletion.
+     * @param nowInSec the current time in seconds to use as local deletion time for the partition deletion.
+     *
+     * @return the newly created partition deletion update.
+     */
+    public static PartitionUpdate fullPartitionDelete(CFMetaData metadata, DecoratedKey key, long timestamp, int nowInSec)
+    {
+        return new PartitionUpdate(metadata,
+                                   key,
+                                   new DeletionInfo(timestamp, nowInSec),
+                                   new RowDataBlock(Columns.NONE, 0, true, metadata.isCounter()),
+                                   PartitionColumns.NONE,
+                                   0);
+    }
+
+    /**
+     * Merges the provided updates, yielding a new update that incorporates all those updates.
+     *
+     * @param updates the collection of updates to merge. This shouldn't be empty.
+     *
+     * @return a partition update that include (merge) all the updates from {@code updates}.
+     */
+    public static PartitionUpdate merge(Collection<PartitionUpdate> updates)
+    {
+        assert !updates.isEmpty();
+        if (updates.size() == 1)
+            return Iterables.getOnlyElement(updates);
+
+        int totalSize = 0;
+        PartitionColumns.Builder builder = PartitionColumns.builder();
+        DecoratedKey key = null;
+        CFMetaData metadata = null;
+        for (PartitionUpdate update : updates)
+        {
+            totalSize += update.rows;
+            builder.addAll(update.columns());
+
+            if (key == null)
+                key = update.partitionKey();
+            else
+                assert key.equals(update.partitionKey());
+
+            if (metadata == null)
+                metadata = update.metadata();
+            else
+                assert metadata.cfId.equals(update.metadata().cfId);
+        }
+
+        // Used when merging row to decide of liveness
+        int nowInSec = FBUtilities.nowInSeconds();
+        PartitionUpdate newUpdate = new PartitionUpdate(metadata, key, builder.build(), totalSize);
+        for (PartitionUpdate update : updates)
+        {
+            newUpdate.deletionInfo.add(update.deletionInfo);
+            if (!update.staticRow().isEmpty())
+            {
+                if (newUpdate.staticRow().isEmpty())
+                    newUpdate.staticRow = update.staticRow().takeAlias();
+                else
+                    Rows.merge(newUpdate.staticRow(), update.staticRow(), newUpdate.columns().statics, newUpdate.staticWriter(), nowInSec, SecondaryIndexManager.nullUpdater);
+            }
+            for (Row row : update)
+                row.copyTo(newUpdate.writer);
+        }
+        return newUpdate;
+    }
+
+    /**
+     * The number of "operations" contained in the update.
+     * <p>
+     * This is used by {@code Memtable} to approximate how much work this update does. In practice, this
+     * count how many rows are updated and how many ranges are deleted by the partition update.
+     *
+     * @return the number of "operations" performed by the update.
+     */
+    public int operationCount()
+    {
+        return rowCount()
+             + deletionInfo.rangeCount()
+             + (deletionInfo.getPartitionDeletion().isLive() ? 0 : 1);
+    }
+
+    /**
+     * The size of the data contained in this update.
+     *
+     * @return the size of the data contained in this update.
+     */
+    public int dataSize()
+    {
+        int clusteringSize = metadata().comparator.size();
+        int size = 0;
+        for (Row row : this)
+        {
+            size += row.clustering().dataSize();
+            for (Cell cell : row)
+                size += cell.dataSize();
+        }
+        return size;
+    }
+
+    /**
+     * If a partition update has been read (and is thus unmodifiable), a call to this method
+     * makes the update modifiable again.
+     * <p>
+     * Please note that calling this method won't result in optimal behavior in the sense that
+     * even if very little is added to the update after this call, the whole update will be sorted
+     * again on read. This should thus be used sparingly (and if it turns that we end up using
+     * this often, we should consider optimizing the behavior).
+     */
+    public synchronized void allowNewUpdates()
+    {
+        // This is synchronized to make extra sure things work properly even if this is
+        // called concurrently with sort() (which should be avoided in the first place, but
+        // better safe than sorry).
+        isSorted = false;
+    }
+
+    /**
+     * Returns an iterator that iterators over the rows of this update in clustering order.
+     * <p>
+     * Note that this might trigger a sorting of the update, and as such the update will not
+     * be modifiable anymore after this call.
+     *
+     * @return an iterator over the rows of this update.
+     */
+    @Override
+    public Iterator<Row> iterator()
+    {
+        maybeSort();
+        return super.iterator();
+    }
+
+    @Override
+    protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter columns, boolean reversed)
+    {
+        maybeSort();
+        return super.sliceableUnfilteredIterator(columns, reversed);
+    }
+
+    /**
+     * Validates the data contained in this update.
+     *
+     * @throws MarshalException if some of the data contained in this update is corrupted.
+     */
+    public void validate()
+    {
+        for (Row row : this)
+        {
+            metadata().comparator.validate(row.clustering());
+            for (Cell cell : row)
+                cell.validate();
+        }
+    }
+
+    /**
+     * The maximum timestamp used in this update.
+     *
+     * @return the maximum timestamp used in this update.
+     */
+    public long maxTimestamp()
+    {
+        return maxTimestamp;
+    }
+
+    /**
+     * For an update on a counter table, returns a list containing a {@code CounterMark} for
+     * every counter contained in the update.
+     *
+     * @return a list with counter marks for every counter in this update.
+     */
+    public List<CounterMark> collectCounterMarks()
+    {
+        assert metadata().isCounter();
+
+        InternalReusableClustering clustering = new InternalReusableClustering();
+        List<CounterMark> l = new ArrayList<>();
+        int i = 0;
+        for (Row row : this)
+        {
+            for (Cell cell : row)
+                if (cell.isCounterCell())
+                    l.add(new CounterMark(clustering, i, cell.column(), cell.path()));
+            i++;
+        }
+        return l;
+    }
+
+    /**
+     * Returns a row writer for the static row of this partition update.
+     *
+     * @return a row writer for the static row of this partition update. A partition
+     * update contains only one static row so only one row should be written through
+     * this writer (but if multiple rows are added, the latest written one wins).
+     */
+    public Row.Writer staticWriter()
+    {
+        return createStaticWriter();
+    }
+
+    /**
+     * Returns a row writer to add (non-static) rows to this partition update.
+     *
+     * @return a row writer to add (non-static) rows to this partition update.
+     * Multiple rows can be successively added this way and the rows added do not have
+     * to be in clustering order. Further, the same row can be added multiple time.
+     *
+     */
+    public Row.Writer writer()
+    {
+        if (isSorted)
+            throw new IllegalStateException("An update should not written again once it has been read");
+
+        return writer;
+    }
+
+    /**
+     * Returns a range tombstone marker writer to add range tombstones to this
+     * partition update.
+     * <p>
+     * Note that if more convenient, range tombstones can also be added using
+     * {@link addRangeTombstone}.
+     *
+     * @param isReverseOrder whether the range tombstone marker will be provided to the returned writer
+     * in clustering order or in reverse clustering order.
+     * @return a range tombstone marker writer to add range tombstones to this update.
+     */
+    public RangeTombstoneMarker.Writer markerWriter(boolean isReverseOrder)
+    {
+        return new RangeTombstoneCollector(isReverseOrder);
+    }
+
+    /**
+     * The number of rows contained in this update.
+     *
+     * @return the number of rows contained in this update.
+     */
+    public int size()
+    {
+        return rows;
+    }
+
+    private void maybeSort()
+    {
+        if (isSorted)
+            return;
+
+        sort();
+    }
+
+    private synchronized void sort()
+    {
+        if (isSorted)
+            return;
+
+        if (rows <= 1)
+        {
+            isSorted = true;
+            return;
+        }
+
+        // Sort the rows - will still potentially contain duplicate (non-reconciled) rows
+        Sorting.sort(this);
+
+        // Now find duplicates and merge them together
+        int previous = 0; // The last element that was set
+        int nowInSec = FBUtilities.nowInSeconds();
+        for (int current = 1; current < rows; current++)
+        {
+            // There is really only 2 possible comparison: < 0 or == 0 since we've sorted already
+            int cmp = compare(previous, current);
+            if (cmp == 0)
+            {
+                // current and previous are the same row. Merge current into previous
+                // (and so previous + 1 will be "free").
+                data.merge(current, previous, nowInSec);
+            }
+            else
+            {
+                // data[current] != [previous], so move current just after previous if needs be
+                ++previous;
+                if (previous != current)
+                    data.move(current, previous);
+            }
+        }
+
+        // previous is on the last value to keep
+        rows = previous + 1;
+
+        isSorted = true;
+    }
+
+    /**
+     * This method is note meant to be used externally: it is only public so this
+     * update conform to the {@link Sorting.Sortable} interface.
+     */
+    public int compare(int i, int j)
+    {
+        return metadata.comparator.compare(p1.setTo(i), p2.setTo(j));
+    }
+
+    protected class StaticWriter extends StaticRow.Builder
+    {
+        protected StaticWriter()
+        {
+            super(columns.statics, false, metadata().isCounter());
+        }
+
+        @Override
+        public void endOfRow()
+        {
+            super.endOfRow();
+            if (staticRow == null)
+            {
+                staticRow = build();
+            }
+            else
+            {
+                StaticRow.Builder builder = StaticRow.builder(columns.statics, true, metadata().isCounter());
+                Rows.merge(staticRow, build(), columns.statics, builder, FBUtilities.nowInSeconds());
+                staticRow = builder.build();
+            }
+        }
+    }
+
+    protected class RegularWriter extends Writer
+    {
+        // For complex column, the writer assumptions is that for a given row, cells of different
+        // complex columns are not intermingled (they also should be in cellPath order). We however
+        // don't yet guarantee that this will be the case for updates (both UpdateStatement and
+        // RowUpdateBuilder can potentially break that assumption; we could change those classes but
+        // that's non trivial, at least for UpdateStatement).
+        // To deal with that problem, we record which complex columns have been updated (for the current
+        // row) and if we detect a violation of our assumption, we switch the row we're writing
+        // into (which is ok because everything will be sorted and merged in maybeSort()).
+        private final Set<ColumnDefinition> updatedComplex = new HashSet();
+        private ColumnDefinition lastUpdatedComplex;
+        private CellPath lastUpdatedComplexPath;
+
+        public RegularWriter()
+        {
+            super(false);
+        }
+
+        @Override
+        public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path)
+        {
+            if (column.isComplex())
+            {
+                if (updatedComplex.contains(column)
+                    && (!column.equals(lastUpdatedComplex) || (column.cellPathComparator().compare(path, lastUpdatedComplexPath)) <= 0))
+                {
+                    // We've updated that complex already, but we've either updated another complex or it's not in order: as this
+                    // break the writer assumption, switch rows.
+                    endOfRow();
+
+                    // Copy the clustering values from the previous row
+                    int clusteringSize = metadata.clusteringColumns().size();
+                    int base = (row - 1) * clusteringSize;
+                    for (int i = 0; i < clusteringSize; i++)
+                        writer.writeClusteringValue(clusterings[base + i]);
+
+                    updatedComplex.clear();
+                }
+
+                lastUpdatedComplex = column;
+                lastUpdatedComplexPath = path;
+                updatedComplex.add(column);
+            }
+            super.writeCell(column, isCounter, value, info, path);
+        }
+
+        @Override
+        public void endOfRow()
+        {
+            super.endOfRow();
+            clear();
+        }
+
+        @Override
+        public Writer reset()
+        {
+            super.reset();
+            clear();
+            return this;
+        }
+
+        private void clear()
+        {
+            updatedComplex.clear();
+            lastUpdatedComplex = null;
+            lastUpdatedComplexPath = null;
+        }
+    }
+
+    public static class PartitionUpdateSerializer
+    {
+        public void serialize(PartitionUpdate update, DataOutputPlus out, int version) throws IOException
+        {
+            if (version < MessagingService.VERSION_30)
+            {
+                // TODO
+                throw new UnsupportedOperationException();
+
+                // if (cf == null)
+                // {
+                //     out.writeBoolean(false);
+                //     return;
+                // }
+
+                // out.writeBoolean(true);
+                // serializeCfId(cf.id(), out, version);
+                // cf.getComparator().deletionInfoSerializer().serialize(cf.deletionInfo(), out, version);
+                // ColumnSerializer columnSerializer = cf.getComparator().columnSerializer();
+                // int count = cf.getColumnCount();
+                // out.writeInt(count);
+                // int written = 0;
+                // for (Cell cell : cf)
+                // {
+                //     columnSerializer.serialize(cell, out);
+                //     written++;
+                // }
+                // assert count == written: "Table had " + count + " columns, but " + written + " written";
+            }
+
+            try (UnfilteredRowIterator iter = update.sliceableUnfilteredIterator())
+            {
+                assert !iter.isReverseOrder();
+                UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, version, update.rows);
+            }
+        }
+
+        public PartitionUpdate deserialize(DataInput in, int version, SerializationHelper.Flag flag, DecoratedKey key) throws IOException
+        {
+            if (version < MessagingService.VERSION_30)
+            {
+                assert key != null;
+
+                // This is only used in mutation, and mutation have never allowed "null" column families
+                boolean present = in.readBoolean();
+                assert present;
+
+                CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
+                LegacyLayout.LegacyDeletionInfo info = LegacyLayout.LegacyDeletionInfo.serializer.deserialize(metadata, in, version);
+                int size = in.readInt();
+                Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.deserializeCells(metadata, in, flag, size);
+                SerializationHelper helper = new SerializationHelper(version, flag);
+                try (UnfilteredRowIterator iterator = LegacyLayout.onWireCellstoUnfilteredRowIterator(metadata, key, info, cells, false, helper))
+                {
+                    return UnfilteredRowIterators.toUpdate(iterator);
+                }
+            }
+
+            assert key == null; // key is only there for the old format
+
+            UnfilteredRowIteratorSerializer.Header h = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, flag);
+            if (h.isEmpty)
+                return emptyUpdate(h.metadata, h.key);
+
+            assert !h.isReversed;
+            assert h.rowEstimate >= 0;
+            PartitionUpdate upd = new PartitionUpdate(h.metadata,
+                                                      h.key,
+                                                      new DeletionInfo(h.partitionDeletion),
+                                                      new RowDataBlock(h.sHeader.columns().regulars, h.rowEstimate, false, h.metadata.isCounter()),
+                                                      h.sHeader.columns(),
+                                                      h.rowEstimate);
+
+            upd.staticRow = h.staticRow;
+
+            RangeTombstoneMarker.Writer markerWriter = upd.markerWriter(false);
+            UnfilteredRowIteratorSerializer.serializer.deserialize(in, new SerializationHelper(version, flag), h.sHeader, upd.writer(), markerWriter);
+
+            // Mark sorted after we're read it all since that's what we use in the writer() method to detect bad uses
+            upd.isSorted = true;
+
+            return upd;
+        }
+
+        public long serializedSize(PartitionUpdate update, int version, TypeSizes sizes)
+        {
+            if (version < MessagingService.VERSION_30)
+            {
+                // TODO
+                throw new UnsupportedOperationException("Version is " + version);
+                //if (cf == null)
+                //{
+                //    return typeSizes.sizeof(false);
+                //}
+                //else
+                //{
+                //    return typeSizes.sizeof(true)  /* nullness bool */
+                //        + cfIdSerializedSize(cf.id(), typeSizes, version)  /* id */
+                //        + contentSerializedSize(cf, typeSizes, version);
+                //}
+            }
+
+            try (UnfilteredRowIterator iter = update.sliceableUnfilteredIterator())
+            {
+                return UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, version, update.rows, sizes);
+            }
+        }
+    }
+
+    /**
+     * A counter mark is basically a pointer to a counter update inside this partition update. That pointer allows
+     * us to update the counter value based on the pre-existing value read during the read-before-write that counters
+     * do. See {@link CounterMutation} to understand how this is used.
+     */
+    public class CounterMark
+    {
+        private final InternalReusableClustering clustering;
+        private final int row;
+        private final ColumnDefinition column;
+        private final CellPath path;
+
+        private CounterMark(InternalReusableClustering clustering, int row, ColumnDefinition column, CellPath path)
+        {
+            this.clustering = clustering;
+            this.row = row;
+            this.column = column;
+            this.path = path;
+        }
+
+        public Clustering clustering()
+        {
+            return clustering.setTo(row);
+        }
+
+        public ColumnDefinition column()
+        {
+            return column;
+        }
+
+        public CellPath path()
+        {
+            return path;
+        }
+
+        public ByteBuffer value()
+        {
+            return data.getValue(row, column, path);
+        }
+
+        public void setValue(ByteBuffer value)
+        {
+            data.setValue(row, column, path, value);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
new file mode 100644
index 0000000..e2fec05
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.util.NoSuchElementException;
+
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+
+public class SingletonUnfilteredPartitionIterator implements UnfilteredPartitionIterator
+{
+    private final UnfilteredRowIterator iter;
+    private final boolean isForThrift;
+    private boolean returned;
+
+    public SingletonUnfilteredPartitionIterator(UnfilteredRowIterator iter, boolean isForThrift)
+    {
+        this.iter = iter;
+        this.isForThrift = isForThrift;
+    }
+
+    public boolean isForThrift()
+    {
+        return isForThrift;
+    }
+
+    public boolean hasNext()
+    {
+        return !returned;
+    }
+
+    public UnfilteredRowIterator next()
+    {
+        if (returned)
+            throw new NoSuchElementException();
+
+        returned = true;
+        return iter;
+    }
+
+    public void remove()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public void close()
+    {
+        iter.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/TombstonePurgingPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/TombstonePurgingPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/TombstonePurgingPartitionIterator.java
new file mode 100644
index 0000000..10022eb
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/TombstonePurgingPartitionIterator.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+
+public abstract class TombstonePurgingPartitionIterator extends FilteringPartitionIterator
+{
+    private final int gcBefore;
+
+    public TombstonePurgingPartitionIterator(UnfilteredPartitionIterator iterator, int gcBefore)
+    {
+        super(iterator);
+        this.gcBefore = gcBefore;
+    }
+
+    protected abstract long getMaxPurgeableTimestamp();
+
+    protected FilteringRow makeRowFilter()
+    {
+        return new FilteringRow()
+        {
+            @Override
+            protected boolean include(LivenessInfo info)
+            {
+                return !info.hasLocalDeletionTime() || !info.isPurgeable(getMaxPurgeableTimestamp(), gcBefore);
+            }
+
+            @Override
+            protected boolean include(DeletionTime dt)
+            {
+                return includeDelTime(dt);
+            }
+
+            @Override
+            protected boolean include(ColumnDefinition c, DeletionTime dt)
+            {
+                return includeDelTime(dt);
+            }
+        };
+    }
+
+    private boolean includeDelTime(DeletionTime dt)
+    {
+        return dt.isLive() || !dt.isPurgeable(getMaxPurgeableTimestamp(), gcBefore);
+    }
+
+    @Override
+    protected boolean includePartitionDeletion(DeletionTime dt)
+    {
+        return includeDelTime(dt);
+    }
+
+    @Override
+    protected boolean includeRangeTombstoneMarker(RangeTombstoneMarker marker)
+    {
+        if (marker.isBoundary())
+        {
+            // We can only skip the whole marker if both deletion time are purgeable.
+            // If only one of them is, filterTombstoneMarker will deal with it.
+            RangeTombstoneBoundaryMarker boundary = (RangeTombstoneBoundaryMarker)marker;
+            return includeDelTime(boundary.endDeletionTime()) || includeDelTime(boundary.startDeletionTime());
+        }
+        else
+        {
+            return includeDelTime(((RangeTombstoneBoundMarker)marker).deletionTime());
+        }
+    }
+
+    @Override
+    protected RangeTombstoneMarker filterRangeTombstoneMarker(RangeTombstoneMarker marker, boolean reversed)
+    {
+        if (!marker.isBoundary())
+            return marker;
+
+        // Note that we know this is called after includeRangeTombstoneMarker. So if one of the deletion time is
+        // purgeable, we know the other one isn't.
+        RangeTombstoneBoundaryMarker boundary = (RangeTombstoneBoundaryMarker)marker;
+        if (!(includeDelTime(boundary.closeDeletionTime(reversed))))
+            return boundary.createCorrespondingCloseBound(reversed);
+        else if (!(includeDelTime(boundary.openDeletionTime(reversed))))
+            return boundary.createCorrespondingOpenBound(reversed);
+        return boundary;
+    }
+
+};

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
new file mode 100644
index 0000000..2447da8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.util.Iterator;
+
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+
+/**
+ * An iterator over a number of unfiltered partitions (i.e. partitions containing deletion informations).
+ *
+ * The object returned by a call to next() is only guaranteed to be
+ * valid until the next call to hasNext() or next(). If a consumer wants to keep a
+ * reference on the returned objects for longer than the iteration, it must
+ * make a copy of it explicitely.
+ */
+public interface UnfilteredPartitionIterator extends Iterator<UnfilteredRowIterator>, AutoCloseable
+{
+    /**
+     * Whether that partition iterator is for a thrift queries.
+     * <p>
+     * If this is true, the partition iterator may return some empty UnfilteredRowIterator and those
+     * should be preserved as thrift include partitions that "exists" (have some cells even
+     * if this are actually deleted) but have nothing matching the query.
+     *
+     * @return whether the iterator is for a thrift query.
+     */
+    public boolean isForThrift();
+
+    public void close();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
new file mode 100644
index 0000000..f66ec11
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.io.DataInput;
+import java.io.IOError;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.util.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.MergeIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Static methods to work with partition iterators.
+ */
+public abstract class UnfilteredPartitionIterators
+{
+    private static final Logger logger = LoggerFactory.getLogger(UnfilteredPartitionIterators.class);
+
+    private static final Serializer serializer = new Serializer();
+
+    private static final Comparator<UnfilteredRowIterator> partitionComparator = new Comparator<UnfilteredRowIterator>()
+    {
+        public int compare(UnfilteredRowIterator p1, UnfilteredRowIterator p2)
+        {
+            return p1.partitionKey().compareTo(p2.partitionKey());
+        }
+    };
+
+    public static final UnfilteredPartitionIterator EMPTY = new AbstractUnfilteredPartitionIterator()
+    {
+        public boolean isForThrift()
+        {
+            return false;
+        }
+
+        public boolean hasNext()
+        {
+            return false;
+        }
+
+        public UnfilteredRowIterator next()
+        {
+            throw new NoSuchElementException();
+        }
+    };
+
+    private UnfilteredPartitionIterators() {}
+
+    public interface MergeListener
+    {
+        public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions);
+        public void close();
+    }
+
+    @SuppressWarnings("resource") // The created resources are returned right away
+    public static UnfilteredRowIterator getOnlyElement(final UnfilteredPartitionIterator iter, SinglePartitionReadCommand<?> command)
+    {
+        // If the query has no results, we'll get an empty iterator, but we still
+        // want a RowIterator out of this method, so we return an empty one.
+        UnfilteredRowIterator toReturn = iter.hasNext()
+                              ? iter.next()
+                              : UnfilteredRowIterators.emptyIterator(command.metadata(),
+                                                                     command.partitionKey(),
+                                                                     command.clusteringIndexFilter().isReversed());
+
+        // Note that in general, we should wrap the result so that it's close method actually
+        // close the whole UnfilteredPartitionIterator.
+        return new WrappingUnfilteredRowIterator(toReturn)
+        {
+            public void close()
+            {
+                try
+                {
+                    super.close();
+                }
+                finally
+                {
+                    // asserting this only now because it bothers Serializer if hasNext() is called before
+                    // the previously returned iterator hasn't been fully consumed.
+                    assert !iter.hasNext();
+
+                    iter.close();
+                }
+            }
+        };
+    }
+
+    public static PartitionIterator mergeAndFilter(List<UnfilteredPartitionIterator> iterators, int nowInSec, MergeListener listener)
+    {
+        // TODO: we could have a somewhat faster version if we were to merge the UnfilteredRowIterators directly as RowIterators
+        return filter(merge(iterators, nowInSec, listener), nowInSec);
+    }
+
+    public static PartitionIterator filter(final UnfilteredPartitionIterator iterator, final int nowInSec)
+    {
+        return new PartitionIterator()
+        {
+            private RowIterator next;
+
+            public boolean hasNext()
+            {
+                while (next == null && iterator.hasNext())
+                {
+                    @SuppressWarnings("resource") // closed either directly if empty, or, if assigned to next, by either
+                                                  // the caller of next() or close()
+                    UnfilteredRowIterator rowIterator = iterator.next();
+                    next = UnfilteredRowIterators.filter(rowIterator, nowInSec);
+                    if (!iterator.isForThrift() && next.isEmpty())
+                    {
+                        rowIterator.close();
+                        next = null;
+                    }
+                }
+                return next != null;
+            }
+
+            public RowIterator next()
+            {
+                if (next == null && !hasNext())
+                    throw new NoSuchElementException();
+
+                RowIterator toReturn = next;
+                next = null;
+                return toReturn;
+            }
+
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+
+            public void close()
+            {
+                try
+                {
+                    iterator.close();
+                }
+                finally
+                {
+                    if (next != null)
+                        next.close();
+                }
+            }
+        };
+    }
+
+    public static UnfilteredPartitionIterator merge(final List<? extends UnfilteredPartitionIterator> iterators, final int nowInSec, final MergeListener listener)
+    {
+        assert listener != null;
+        assert !iterators.isEmpty();
+
+        final boolean isForThrift = iterators.get(0).isForThrift();
+
+        final MergeIterator<UnfilteredRowIterator, UnfilteredRowIterator> merged = MergeIterator.get(iterators, partitionComparator, new MergeIterator.Reducer<UnfilteredRowIterator, UnfilteredRowIterator>()
+        {
+            private final List<UnfilteredRowIterator> toMerge = new ArrayList<>(iterators.size());
+
+            private CFMetaData metadata;
+            private DecoratedKey partitionKey;
+            private boolean isReverseOrder;
+
+            public void reduce(int idx, UnfilteredRowIterator current)
+            {
+                metadata = current.metadata();
+                partitionKey = current.partitionKey();
+                isReverseOrder = current.isReverseOrder();
+
+                // Note that because the MergeListener cares about it, we want to preserve the index of the iterator.
+                // Non-present iterator will thus be set to empty in getReduced.
+                toMerge.set(idx, current);
+            }
+
+            protected UnfilteredRowIterator getReduced()
+            {
+                UnfilteredRowIterators.MergeListener rowListener = listener.getRowMergeListener(partitionKey, toMerge);
+
+                // Replace nulls by empty iterators
+                for (int i = 0; i < toMerge.size(); i++)
+                    if (toMerge.get(i) == null)
+                        toMerge.set(i, UnfilteredRowIterators.emptyIterator(metadata, partitionKey, isReverseOrder));
+
+                return UnfilteredRowIterators.merge(toMerge, nowInSec, rowListener);
+            }
+
+            protected void onKeyChange()
+            {
+                toMerge.clear();
+                for (int i = 0; i < iterators.size(); i++)
+                    toMerge.add(null);
+            }
+        });
+
+        return new AbstractUnfilteredPartitionIterator()
+        {
+            public boolean isForThrift()
+            {
+                return isForThrift;
+            }
+
+            public boolean hasNext()
+            {
+                return merged.hasNext();
+            }
+
+            public UnfilteredRowIterator next()
+            {
+                return merged.next();
+            }
+
+            @Override
+            public void close()
+            {
+                merged.close();
+            }
+        };
+    }
+
+    /**
+     * Convert all expired cells to equivalent tombstones.
+     * <p>
+     * See {@link UnfilteredRowIterators#convertExpiredCellsToTombstones} for details.
+     *
+     * @param iterator the iterator in which to conver expired cells.
+     * @param nowInSec the current time to use to decide if a cell is expired.
+     * @return an iterator that returns the same data than {@code iterator} but with all expired cells converted
+     * to equivalent tombstones.
+     */
+    public static UnfilteredPartitionIterator convertExpiredCellsToTombstones(UnfilteredPartitionIterator iterator, final int nowInSec)
+    {
+        return new WrappingUnfilteredPartitionIterator(iterator)
+        {
+            @Override
+            protected UnfilteredRowIterator computeNext(UnfilteredRowIterator iter)
+            {
+                return UnfilteredRowIterators.convertExpiredCellsToTombstones(iter, nowInSec);
+            }
+        };
+    }
+
+    public static UnfilteredPartitionIterator mergeLazily(final List<? extends UnfilteredPartitionIterator> iterators, final int nowInSec)
+    {
+        assert !iterators.isEmpty();
+
+        if (iterators.size() == 1)
+            return iterators.get(0);
+
+        final boolean isForThrift = iterators.get(0).isForThrift();
+
+        final MergeIterator<UnfilteredRowIterator, UnfilteredRowIterator> merged = MergeIterator.get(iterators, partitionComparator, new MergeIterator.Reducer<UnfilteredRowIterator, UnfilteredRowIterator>()
+        {
+            private final List<UnfilteredRowIterator> toMerge = new ArrayList<>(iterators.size());
+
+            @Override
+            public boolean trivialReduceIsTrivial()
+            {
+                return false;
+            }
+
+            public void reduce(int idx, UnfilteredRowIterator current)
+            {
+                toMerge.add(current);
+            }
+
+            protected UnfilteredRowIterator getReduced()
+            {
+                return new LazilyInitializedUnfilteredRowIterator(toMerge.get(0).partitionKey())
+                {
+                    protected UnfilteredRowIterator initializeIterator()
+                    {
+                        return UnfilteredRowIterators.merge(toMerge, nowInSec);
+                    }
+                };
+            }
+
+            protected void onKeyChange()
+            {
+                toMerge.clear();
+            }
+        });
+
+        return new AbstractUnfilteredPartitionIterator()
+        {
+            public boolean isForThrift()
+            {
+                return isForThrift;
+            }
+
+            public boolean hasNext()
+            {
+                return merged.hasNext();
+            }
+
+            public UnfilteredRowIterator next()
+            {
+                return merged.next();
+            }
+
+            @Override
+            public void close()
+            {
+                merged.close();
+            }
+        };
+    }
+
+    public static UnfilteredPartitionIterator removeDroppedColumns(UnfilteredPartitionIterator iterator, final Map<ColumnIdentifier, CFMetaData.DroppedColumn> droppedColumns)
+    {
+        return new FilteringPartitionIterator(iterator)
+        {
+            @Override
+            protected FilteringRow makeRowFilter()
+            {
+                return new FilteringRow()
+                {
+                    @Override
+                    protected boolean include(Cell cell)
+                    {
+                        return include(cell.column(), cell.livenessInfo().timestamp());
+                    }
+
+                    @Override
+                    protected boolean include(ColumnDefinition c, DeletionTime dt)
+                    {
+                        return include(c, dt.markedForDeleteAt());
+                    }
+
+                    private boolean include(ColumnDefinition column, long timestamp)
+                    {
+                        CFMetaData.DroppedColumn dropped = droppedColumns.get(column.name);
+                        return dropped == null || timestamp > dropped.droppedTime;
+                    }
+                };
+            }
+
+            @Override
+            protected boolean shouldFilter(UnfilteredRowIterator iterator)
+            {
+                // TODO: We could have row iterators return the smallest timestamp they might return
+                // (which we can get from sstable stats), and ignore any dropping if that smallest
+                // timestamp is bigger that the biggest droppedColumns timestamp.
+
+                // If none of the dropped columns is part of the columns that the iterator actually returns, there is nothing to do;
+                for (ColumnDefinition c : iterator.columns())
+                    if (droppedColumns.containsKey(c.name))
+                        return true;
+
+                return false;
+            }
+        };
+    }
+
+    public static void digest(UnfilteredPartitionIterator iterator, MessageDigest digest)
+    {
+        try (UnfilteredPartitionIterator iter = iterator)
+        {
+            while (iter.hasNext())
+            {
+                try (UnfilteredRowIterator partition = iter.next())
+                {
+                    UnfilteredRowIterators.digest(partition, digest);
+                }
+            }
+        }
+    }
+
+    public static Serializer serializerForIntraNode()
+    {
+        return serializer;
+    }
+
+    /**
+     * Wraps the provided iterator so it logs the returned rows/RT for debugging purposes.
+     * <p>
+     * Note that this is only meant for debugging as this can log a very large amount of
+     * logging at INFO.
+     */
+    public static UnfilteredPartitionIterator loggingIterator(UnfilteredPartitionIterator iterator, final String id, final boolean fullDetails)
+    {
+        return new WrappingUnfilteredPartitionIterator(iterator)
+        {
+            public UnfilteredRowIterator next()
+            {
+                return UnfilteredRowIterators.loggingIterator(super.next(), id, fullDetails);
+            }
+        };
+    }
+
+    /**
+     * Serialize each UnfilteredSerializer one after the other, with an initial byte that indicates whether
+     * we're done or not.
+     */
+    public static class Serializer
+    {
+        public void serialize(UnfilteredPartitionIterator iter, DataOutputPlus out, int version) throws IOException
+        {
+            if (version < MessagingService.VERSION_30)
+                throw new UnsupportedOperationException();
+
+            out.writeBoolean(iter.isForThrift());
+            while (iter.hasNext())
+            {
+                out.writeBoolean(true);
+                try (UnfilteredRowIterator partition = iter.next())
+                {
+                    UnfilteredRowIteratorSerializer.serializer.serialize(partition, out, version);
+                }
+            }
+            out.writeBoolean(false);
+        }
+
+        public UnfilteredPartitionIterator deserialize(final DataInput in, final int version, final SerializationHelper.Flag flag) throws IOException
+        {
+            if (version < MessagingService.VERSION_30)
+                throw new UnsupportedOperationException();
+
+            final boolean isForThrift = in.readBoolean();
+
+            return new AbstractUnfilteredPartitionIterator()
+            {
+                private UnfilteredRowIterator next;
+                private boolean hasNext;
+                private boolean nextReturned = true;
+
+                public boolean isForThrift()
+                {
+                    return isForThrift;
+                }
+
+                public boolean hasNext()
+                {
+                    if (!nextReturned)
+                        return hasNext;
+
+                    // We can't answer this until the previously returned iterator has been fully consumed,
+                    // so complain if that's not the case.
+                    if (next != null && next.hasNext())
+                        throw new IllegalStateException("Cannot call hasNext() until the previous iterator has been fully consumed");
+
+                    try
+                    {
+                        hasNext = in.readBoolean();
+                        nextReturned = false;
+                        return hasNext;
+                    }
+                    catch (IOException e)
+                    {
+                        throw new IOError(e);
+                    }
+                }
+
+                public UnfilteredRowIterator next()
+                {
+                    if (nextReturned && !hasNext())
+                        throw new NoSuchElementException();
+
+                    try
+                    {
+                        nextReturned = true;
+                        next = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, flag);
+                        return next;
+                    }
+                    catch (IOException e)
+                    {
+                        throw new IOError(e);
+                    }
+                }
+
+                @Override
+                public void close()
+                {
+                    if (next != null)
+                        next.close();
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/WrappingPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/WrappingPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/WrappingPartitionIterator.java
new file mode 100644
index 0000000..4d4be70
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/WrappingPartitionIterator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.db.rows.RowIterator;
+
+public abstract class WrappingPartitionIterator implements PartitionIterator
+{
+    protected final PartitionIterator wrapped;
+
+    protected WrappingPartitionIterator(PartitionIterator wrapped)
+    {
+        this.wrapped = wrapped;
+    }
+
+    public boolean hasNext()
+    {
+        return wrapped.hasNext();
+    }
+
+    public RowIterator next()
+    {
+        return wrapped.next();
+    }
+
+    public void remove()
+    {
+        wrapped.remove();
+    }
+
+    public void close()
+    {
+        wrapped.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java
new file mode 100644
index 0000000..4f35075
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+
+/**
+ * A utility class for writing partition iterators that filter/modify other
+ * partition iterators.
+ *
+ * This work a little bit like Guava's AbstractIterator in that you only need
+ * to implement the computeNext() method, though that method takes as argument
+ * the UnfilteredRowIterator to filter from the wrapped partition iterator.
+ */
+public abstract class WrappingUnfilteredPartitionIterator extends AbstractUnfilteredPartitionIterator
+{
+    protected final UnfilteredPartitionIterator wrapped;
+
+    private UnfilteredRowIterator next;
+
+    protected WrappingUnfilteredPartitionIterator(UnfilteredPartitionIterator wrapped)
+    {
+        this.wrapped = wrapped;
+    }
+
+    public boolean isForThrift()
+    {
+        return wrapped.isForThrift();
+    }
+
+    public boolean hasNext()
+    {
+        prepareNext();
+        return next != null;
+    }
+
+    public UnfilteredRowIterator next()
+    {
+        prepareNext();
+        assert next != null;
+
+        UnfilteredRowIterator toReturn = next;
+        next = null;
+        return toReturn;
+    }
+
+    private void prepareNext()
+    {
+        while (next == null && wrapped.hasNext())
+        {
+            @SuppressWarnings("resource") // Closed on exception, right away if empty or ignored by computeNext, or if assigned to 'next',
+                                          // either by the caller to next(), or in close().
+            UnfilteredRowIterator wrappedNext = wrapped.next();
+            try
+            {
+                UnfilteredRowIterator maybeNext = computeNext(wrappedNext);
+
+                // As the wrappd iterator shouldn't return an empty iterator, if computeNext
+                // gave us back it's input we save the isEmpty check.
+                if (maybeNext != null && (isForThrift() || maybeNext == wrappedNext || !maybeNext.isEmpty()))
+                {
+                    next = maybeNext;
+                    return;
+                }
+                else
+                {
+                    wrappedNext.close();
+                }
+            }
+            catch (RuntimeException | Error e)
+            {
+                wrappedNext.close();
+                throw e;
+            }
+        }
+    }
+
+    /**
+     * Given the next UnfilteredRowIterator from the wrapped partition iterator, return
+     * the (potentially modified) UnfilteredRowIterator to return. Please note that the
+     * result will be skipped if it's either {@code null} of if it's empty.
+     *
+     * The default implementation return it's input unchanged to make it easier
+     * to write wrapping partition iterators that only change the close method.
+     */
+    protected UnfilteredRowIterator computeNext(UnfilteredRowIterator iter)
+    {
+        return iter;
+    }
+
+    @Override
+    public void close()
+    {
+        try
+        {
+            wrapped.close();
+        }
+        finally
+        {
+            if (next != null)
+                next.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/AbstractCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
new file mode 100644
index 0000000..c003d6f
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.security.MessageDigest;
+import java.util.Objects;
+
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Base abstract class for {@code Cell} implementations.
+ *
+ * Unless you have a very good reason not to, every cell implementation
+ * should probably extend this class.
+ */
+public abstract class AbstractCell implements Cell
+{
+    public boolean isLive(int nowInSec)
+    {
+        return livenessInfo().isLive(nowInSec);
+    }
+
+    public boolean isTombstone()
+    {
+        return livenessInfo().hasLocalDeletionTime() && !livenessInfo().hasTTL();
+    }
+
+    public boolean isExpiring()
+    {
+        return livenessInfo().hasTTL();
+    }
+
+    public void writeTo(Row.Writer writer)
+    {
+        writer.writeCell(column(), isCounterCell(), value(), livenessInfo(), path());
+    }
+
+    public void digest(MessageDigest digest)
+    {
+        digest.update(value().duplicate());
+        livenessInfo().digest(digest);
+        FBUtilities.updateWithBoolean(digest, isCounterCell());
+        if (path() != null)
+            path().digest(digest);
+    }
+
+    public void validate()
+    {
+        column().validateCellValue(value());
+
+        livenessInfo().validate();
+
+        // If cell is a tombstone, it shouldn't have a value.
+        if (isTombstone() && value().hasRemaining())
+            throw new MarshalException("A tombstone should not have a value");
+
+        if (path() != null)
+            column().validateCellPath(path());
+    }
+
+    public int dataSize()
+    {
+        int size = value().remaining() + livenessInfo().dataSize();
+        if (path() != null)
+            size += path().dataSize();
+        return size;
+
+    }
+
+    @Override
+    public boolean equals(Object other)
+    {
+        if(!(other instanceof Cell))
+            return false;
+
+        Cell that = (Cell)other;
+        return this.column().equals(that.column())
+            && this.isCounterCell() == that.isCounterCell()
+            && Objects.equals(this.value(), that.value())
+            && Objects.equals(this.livenessInfo(), that.livenessInfo())
+            && Objects.equals(this.path(), that.path());
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(column(), isCounterCell(), value(), livenessInfo(), path());
+    }
+
+    @Override
+    public String toString()
+    {
+        if (isCounterCell())
+            return String.format("[%s=%d ts=%d]", column().name, CounterContext.instance().total(value()), livenessInfo().timestamp());
+
+        AbstractType<?> type = column().type;
+        if (type instanceof CollectionType && type.isMultiCell())
+        {
+            CollectionType ct = (CollectionType)type;
+            return String.format("[%s[%s]=%s info=%s]",
+                                 column().name,
+                                 ct.nameComparator().getString(path().get(0)),
+                                 ct.valueComparator().getString(value()),
+                                 livenessInfo());
+        }
+        return String.format("[%s=%s info=%s]", column().name, type.getString(value()), livenessInfo());
+    }
+
+    public Cell takeAlias()
+    {
+        // Cell is always used as an Aliasable object but as the code currently
+        // never need to alias a cell outside of its valid scope, we don't yet
+        // need that.
+        throw new UnsupportedOperationException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java b/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java
new file mode 100644
index 0000000..d8256fc
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+
+public abstract class AbstractRangeTombstoneMarker implements RangeTombstoneMarker
+{
+    protected final RangeTombstone.Bound bound;
+
+    protected AbstractRangeTombstoneMarker(RangeTombstone.Bound bound)
+    {
+        this.bound = bound;
+    }
+
+    public RangeTombstone.Bound clustering()
+    {
+        return bound;
+    }
+
+    public Unfiltered.Kind kind()
+    {
+        return Unfiltered.Kind.RANGE_TOMBSTONE_MARKER;
+    }
+
+    public void validateData(CFMetaData metadata)
+    {
+        Slice.Bound bound = clustering();
+        for (int i = 0; i < bound.size(); i++)
+        {
+            ByteBuffer value = bound.get(i);
+            if (value != null)
+                metadata.comparator.subtype(i).validate(value);
+        }
+    }
+
+    public String toString(CFMetaData metadata, boolean fullDetails)
+    {
+        return toString(metadata);
+    }
+
+    protected void copyBoundTo(RangeTombstoneMarker.Writer writer)
+    {
+        for (int i = 0; i < bound.size(); i++)
+            writer.writeClusteringValue(bound.get(i));
+        writer.writeBoundKind(bound.kind());
+    }
+
+    public Unfiltered takeAlias()
+    {
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/AbstractReusableRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractReusableRow.java b/src/java/org/apache/cassandra/db/rows/AbstractReusableRow.java
new file mode 100644
index 0000000..03aeb88
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/AbstractReusableRow.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.util.Iterator;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.SearchIterator;
+
+public abstract class AbstractReusableRow extends AbstractRow
+{
+    private CellData.ReusableCell simpleCell;
+    private ComplexRowDataBlock.ReusableIterator complexCells;
+    private DeletionTimeArray.Cursor complexDeletionCursor;
+    private RowDataBlock.ReusableIterator iterator;
+
+    public AbstractReusableRow()
+    {
+    }
+
+    protected abstract int row();
+    protected abstract RowDataBlock data();
+
+    private CellData.ReusableCell simpleCell()
+    {
+        if (simpleCell == null)
+            simpleCell = SimpleRowDataBlock.reusableCell();
+        return simpleCell;
+    }
+
+    private ComplexRowDataBlock.ReusableIterator complexCells()
+    {
+        if (complexCells == null)
+            complexCells = ComplexRowDataBlock.reusableComplexCells();
+        return complexCells;
+    }
+
+    private DeletionTimeArray.Cursor complexDeletionCursor()
+    {
+        if (complexDeletionCursor == null)
+            complexDeletionCursor = ComplexRowDataBlock.complexDeletionCursor();
+        return complexDeletionCursor;
+    }
+
+    private RowDataBlock.ReusableIterator reusableIterator()
+    {
+        if (iterator == null)
+            iterator = RowDataBlock.reusableIterator();
+        return iterator;
+    }
+
+    public Columns columns()
+    {
+        return data().columns();
+    }
+
+    public Cell getCell(ColumnDefinition c)
+    {
+        assert !c.isComplex();
+        if (data().simpleData == null)
+            return null;
+
+        int idx = columns().simpleIdx(c, 0);
+        if (idx < 0)
+            return null;
+
+        return simpleCell().setTo(data().simpleData.data, c, (row() * columns().simpleColumnCount()) + idx);
+    }
+
+    public Cell getCell(ColumnDefinition c, CellPath path)
+    {
+        assert c.isComplex();
+
+        ComplexRowDataBlock data = data().complexData;
+        if (data == null)
+            return null;
+
+        int idx = data.cellIdx(row(), c, path);
+        if (idx < 0)
+            return null;
+
+        return simpleCell().setTo(data.cellData(row()), c, idx);
+    }
+
+    public Iterator<Cell> getCells(ColumnDefinition c)
+    {
+        assert c.isComplex();
+        return complexCells().setTo(data().complexData, row(), c);
+    }
+
+    public boolean hasComplexDeletion()
+    {
+        return data().hasComplexDeletion(row());
+    }
+
+    public DeletionTime getDeletion(ColumnDefinition c)
+    {
+        assert c.isComplex();
+        if (data().complexData == null)
+            return DeletionTime.LIVE;
+
+        int idx = data().complexData.complexDeletionIdx(row(), c);
+        return idx < 0
+             ? DeletionTime.LIVE
+             : complexDeletionCursor().setTo(data().complexData.complexDelTimes, idx);
+    }
+
+    public Iterator<Cell> iterator()
+    {
+        return reusableIterator().setTo(data(), row());
+    }
+
+    public SearchIterator<ColumnDefinition, ColumnData> searchIterator()
+    {
+        return new SearchIterator<ColumnDefinition, ColumnData>()
+        {
+            private int simpleIdx = 0;
+
+            public boolean hasNext()
+            {
+                // TODO: we can do better, but we expect users to no rely on this anyway
+                return true;
+            }
+
+            public ColumnData next(ColumnDefinition column)
+            {
+                if (column.isComplex())
+                {
+                    // TODO: this is sub-optimal
+
+                    Iterator<Cell> cells = getCells(column);
+                    return cells == null ? null : new ColumnData(column, null, cells, getDeletion(column));
+                }
+                else
+                {
+                    int idx = columns().simpleIdx(column, simpleIdx);
+                    if (idx < 0)
+                        return null;
+
+                    Cell cell = simpleCell().setTo(data().simpleData.data, column, (row() * columns().simpleColumnCount()) + idx);
+                    simpleIdx = idx + 1;
+                    return cell == null ? null : new ColumnData(column, cell, null, null);
+                }
+            }
+        };
+    }
+
+    public Row takeAlias()
+    {
+        final Clustering clustering = clustering().takeAlias();
+        final LivenessInfo info = primaryKeyLivenessInfo().takeAlias();
+        final DeletionTime deletion = deletion().takeAlias();
+
+        final RowDataBlock data = data();
+        final int row = row();
+
+        return new AbstractReusableRow()
+        {
+            protected RowDataBlock data()
+            {
+                return data;
+            }
+
+            protected int row()
+            {
+                return row;
+            }
+
+            public Clustering clustering()
+            {
+                return clustering;
+            }
+
+            public LivenessInfo primaryKeyLivenessInfo()
+            {
+                return info;
+            }
+
+            public DeletionTime deletion()
+            {
+                return deletion;
+            }
+
+            @Override
+            public Row takeAlias()
+            {
+                return this;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/AbstractRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
new file mode 100644
index 0000000..a99bc78
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.Iterator;
+import java.util.Objects;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Base abstract class for {@code Row} implementations.
+ *
+ * Unless you have a very good reason not to, every row implementation
+ * should probably extend this class.
+ */
+public abstract class AbstractRow implements Row
+{
+    public Unfiltered.Kind kind()
+    {
+        return Unfiltered.Kind.ROW;
+    }
+
+    public boolean hasLiveData(int nowInSec)
+    {
+        if (primaryKeyLivenessInfo().isLive(nowInSec))
+            return true;
+
+        for (Cell cell : this)
+            if (cell.isLive(nowInSec))
+                return true;
+
+        return false;
+    }
+
+    public boolean isEmpty()
+    {
+        return !primaryKeyLivenessInfo().hasTimestamp()
+            && deletion().isLive()
+            && !iterator().hasNext()
+            && !hasComplexDeletion();
+    }
+
+    public boolean isStatic()
+    {
+        return clustering() == Clustering.STATIC_CLUSTERING;
+    }
+
+    public void digest(MessageDigest digest)
+    {
+        FBUtilities.updateWithByte(digest, kind().ordinal());
+        clustering().digest(digest);
+
+        deletion().digest(digest);
+        primaryKeyLivenessInfo().digest(digest);
+
+        Iterator<ColumnDefinition> iter = columns().complexColumns();
+        while (iter.hasNext())
+            getDeletion(iter.next()).digest(digest);
+
+        for (Cell cell : this)
+            cell.digest(digest);
+    }
+
+    /**
+     * Copy this row to the provided writer.
+     *
+     * @param writer the row writer to write this row to.
+     */
+    public void copyTo(Row.Writer writer)
+    {
+        Rows.writeClustering(clustering(), writer);
+        writer.writePartitionKeyLivenessInfo(primaryKeyLivenessInfo());
+        writer.writeRowDeletion(deletion());
+
+        for (Cell cell : this)
+            cell.writeTo(writer);
+
+        for (int i = 0; i < columns().complexColumnCount(); i++)
+        {
+            ColumnDefinition c = columns().getComplex(i);
+            DeletionTime dt = getDeletion(c);
+            if (!dt.isLive())
+                writer.writeComplexDeletion(c, dt);
+        }
+        writer.endOfRow();
+    }
+
+    public void validateData(CFMetaData metadata)
+    {
+        Clustering clustering = clustering();
+        for (int i = 0; i < clustering.size(); i++)
+        {
+            ByteBuffer value = clustering.get(i);
+            if (value != null)
+                metadata.comparator.subtype(i).validate(value);
+        }
+
+        primaryKeyLivenessInfo().validate();
+        if (deletion().localDeletionTime() < 0)
+            throw new MarshalException("A local deletion time should not be negative");
+
+        for (Cell cell : this)
+            cell.validate();
+    }
+
+    public String toString(CFMetaData metadata)
+    {
+        return toString(metadata, false);
+    }
+
+    public String toString(CFMetaData metadata, boolean fullDetails)
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Row");
+        if (fullDetails)
+        {
+            sb.append("[info=").append(primaryKeyLivenessInfo());
+            if (!deletion().isLive())
+                sb.append(" del=").append(deletion());
+            sb.append(" ]");
+        }
+        sb.append(": ").append(clustering().toString(metadata)).append(" | ");
+        boolean isFirst = true;
+        ColumnDefinition prevColumn = null;
+        for (Cell cell : this)
+        {
+            if (isFirst) isFirst = false; else sb.append(", ");
+            if (fullDetails)
+            {
+                if (cell.column().isComplex() && !cell.column().equals(prevColumn))
+                {
+                    DeletionTime complexDel = getDeletion(cell.column());
+                    if (!complexDel.isLive())
+                        sb.append("del(").append(cell.column().name).append(")=").append(complexDel).append(", ");
+                }
+                sb.append(cell);
+                prevColumn = cell.column();
+            }
+            else
+            {
+                sb.append(cell.column().name);
+                if (cell.column().type instanceof CollectionType)
+                {
+                    CollectionType ct = (CollectionType)cell.column().type;
+                    sb.append("[").append(ct.nameComparator().getString(cell.path().get(0))).append("]");
+                    sb.append("=").append(ct.valueComparator().getString(cell.value()));
+                }
+                else
+                {
+                    sb.append("=").append(cell.column().type.getString(cell.value()));
+                }
+            }
+        }
+        return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object other)
+    {
+        if(!(other instanceof Row))
+            return false;
+
+        Row that = (Row)other;
+        if (!this.clustering().equals(that.clustering())
+             || !this.columns().equals(that.columns())
+             || !this.primaryKeyLivenessInfo().equals(that.primaryKeyLivenessInfo())
+             || !this.deletion().equals(that.deletion()))
+            return false;
+
+        Iterator<Cell> thisCells = this.iterator();
+        Iterator<Cell> thatCells = that.iterator();
+        while (thisCells.hasNext())
+        {
+            if (!thatCells.hasNext() || !thisCells.next().equals(thatCells.next()))
+                return false;
+        }
+        return !thatCells.hasNext();
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int hash = Objects.hash(clustering(), columns(), primaryKeyLivenessInfo(), deletion());
+        for (Cell cell : this)
+            hash += 31 * cell.hashCode();
+        return hash;
+    }
+}