You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:44 UTC

[20/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/MemtableRowData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/MemtableRowData.java b/src/java/org/apache/cassandra/db/rows/MemtableRowData.java
new file mode 100644
index 0000000..cad0765
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/MemtableRowData.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+
+/**
+ * Row data stored inside a memtable.
+ *
+ * This has methods like dataSize and unsharedHeapSizeExcludingData that are
+ * specific to memtables.
+ */
+public interface MemtableRowData extends Clusterable
+{
+    public Columns columns();
+
+    public int dataSize();
+
+    // returns the size of the Row and all references on the heap, excluding any costs associated with byte arrays
+    // that would be allocated by a clone operation, as these will be accounted for by the allocator
+    public long unsharedHeapSizeExcludingData();
+
+    public interface ReusableRow extends Row
+    {
+        public ReusableRow setTo(MemtableRowData rowData);
+    }
+
+    public class BufferRowData implements MemtableRowData
+    {
+        private static final long EMPTY_SIZE = ObjectSizes.measure(new BufferRowData(null, LivenessInfo.NONE, DeletionTime.LIVE, null));
+
+        private final Clustering clustering;
+        private final LivenessInfo livenessInfo;
+        private final DeletionTime deletion;
+        private final RowDataBlock dataBlock;
+
+        public BufferRowData(Clustering clustering, LivenessInfo livenessInfo, DeletionTime deletion, RowDataBlock dataBlock)
+        {
+            this.clustering = clustering;
+            this.livenessInfo = livenessInfo.takeAlias();
+            this.deletion = deletion.takeAlias();
+            this.dataBlock = dataBlock;
+        }
+
+        public Clustering clustering()
+        {
+            return clustering;
+        }
+
+        public Columns columns()
+        {
+            return dataBlock.columns();
+        }
+
+        public int dataSize()
+        {
+            return clustering.dataSize() + livenessInfo.dataSize() + deletion.dataSize() + dataBlock.dataSize();
+        }
+
+        public long unsharedHeapSizeExcludingData()
+        {
+            return EMPTY_SIZE
+                 + (clustering == Clustering.STATIC_CLUSTERING ? 0 : ((BufferClustering)clustering).unsharedHeapSizeExcludingData())
+                 + dataBlock.unsharedHeapSizeExcludingData();
+        }
+
+        public static ReusableRow createReusableRow()
+        {
+            return new BufferRow();
+        }
+
+        private static class BufferRow extends AbstractReusableRow implements ReusableRow
+        {
+            private BufferRowData rowData;
+
+            private BufferRow()
+            {
+            }
+
+            public ReusableRow setTo(MemtableRowData rowData)
+            {
+                assert rowData instanceof BufferRowData;
+                this.rowData = (BufferRowData)rowData;
+                return this;
+            }
+
+            protected RowDataBlock data()
+            {
+                return rowData.dataBlock;
+            }
+
+            protected int row()
+            {
+                return 0;
+            }
+
+            public Clustering clustering()
+            {
+                return rowData.clustering;
+            }
+
+            public LivenessInfo primaryKeyLivenessInfo()
+            {
+                return rowData.livenessInfo;
+            }
+
+            public DeletionTime deletion()
+            {
+                return rowData.deletion;
+            }
+        }
+    }
+
+    public class BufferClustering extends Clustering
+    {
+        private static final long EMPTY_SIZE = ObjectSizes.measure(new BufferClustering(0));
+
+        private final ByteBuffer[] values;
+
+        public BufferClustering(int size)
+        {
+            this.values = new ByteBuffer[size];
+        }
+
+        public void setClusteringValue(int i, ByteBuffer value)
+        {
+            values[i] = value;
+        }
+
+        public int size()
+        {
+            return values.length;
+        }
+
+        public ByteBuffer get(int i)
+        {
+            return values[i];
+        }
+
+        public ByteBuffer[] getRawValues()
+        {
+            return values;
+        }
+
+        public long unsharedHeapSizeExcludingData()
+        {
+            return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(values);
+        }
+
+        @Override
+        public long unsharedHeapSize()
+        {
+            return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(values);
+        }
+
+        public Clustering takeAlias()
+        {
+            return this;
+        }
+    }
+
+    public class BufferCellPath extends CellPath.SimpleCellPath
+    {
+        private static final long EMPTY_SIZE = ObjectSizes.measure(new BufferCellPath(new ByteBuffer[0]));
+
+        private BufferCellPath(ByteBuffer[] values)
+        {
+            super(values);
+        }
+
+        public static BufferCellPath clone(CellPath path, AbstractAllocator allocator)
+        {
+            int size = path.size();
+            ByteBuffer[] values = new ByteBuffer[size];
+            for (int i = 0; i < size; i++)
+                values[i] = allocator.clone(path.get(0));
+            return new BufferCellPath(values);
+        }
+
+        public long unsharedHeapSizeExcludingData()
+        {
+            return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(values);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
new file mode 100644
index 0000000..b5ac19b
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.Objects;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+
+/**
+ * A range tombstone marker that indicates the bound of a range tombstone (start or end).
+ */
+public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker
+{
+    private final DeletionTime deletion;
+
+    public RangeTombstoneBoundMarker(RangeTombstone.Bound bound, DeletionTime deletion)
+    {
+        super(bound);
+        assert bound.kind().isBound();
+        this.deletion = deletion;
+    }
+
+    public RangeTombstoneBoundMarker(Slice.Bound bound, DeletionTime deletion)
+    {
+        this(new RangeTombstone.Bound(bound.kind(), bound.getRawValues()), deletion);
+    }
+
+    public static RangeTombstoneBoundMarker inclusiveStart(ClusteringPrefix clustering, DeletionTime deletion)
+    {
+        return new RangeTombstoneBoundMarker(new RangeTombstone.Bound(RangeTombstone.Bound.Kind.INCL_START_BOUND, clustering.getRawValues()), deletion);
+    }
+
+    public static RangeTombstoneBoundMarker inclusiveEnd(ClusteringPrefix clustering, DeletionTime deletion)
+    {
+        return new RangeTombstoneBoundMarker(new RangeTombstone.Bound(RangeTombstone.Bound.Kind.INCL_END_BOUND, clustering.getRawValues()), deletion);
+    }
+
+    public static RangeTombstoneBoundMarker inclusiveOpen(boolean reversed, ByteBuffer[] boundValues, DeletionTime deletion)
+    {
+        RangeTombstone.Bound bound = RangeTombstone.Bound.inclusiveOpen(reversed, boundValues);
+        return new RangeTombstoneBoundMarker(bound, deletion);
+    }
+
+    public static RangeTombstoneBoundMarker exclusiveOpen(boolean reversed, ByteBuffer[] boundValues, DeletionTime deletion)
+    {
+        RangeTombstone.Bound bound = RangeTombstone.Bound.exclusiveOpen(reversed, boundValues);
+        return new RangeTombstoneBoundMarker(bound, deletion);
+    }
+
+    public static RangeTombstoneBoundMarker inclusiveClose(boolean reversed, ByteBuffer[] boundValues, DeletionTime deletion)
+    {
+        RangeTombstone.Bound bound = RangeTombstone.Bound.inclusiveClose(reversed, boundValues);
+        return new RangeTombstoneBoundMarker(bound, deletion);
+    }
+
+    public static RangeTombstoneBoundMarker exclusiveClose(boolean reversed, ByteBuffer[] boundValues, DeletionTime deletion)
+    {
+        RangeTombstone.Bound bound = RangeTombstone.Bound.exclusiveClose(reversed, boundValues);
+        return new RangeTombstoneBoundMarker(bound, deletion);
+    }
+
+    public boolean isBoundary()
+    {
+        return false;
+    }
+
+    /**
+     * The deletion time for the range tombstone this is a bound of.
+     */
+    public DeletionTime deletionTime()
+    {
+        return deletion;
+    }
+
+    public boolean isOpen(boolean reversed)
+    {
+        return bound.kind().isOpen(reversed);
+    }
+
+    public boolean isClose(boolean reversed)
+    {
+        return bound.kind().isClose(reversed);
+    }
+
+    public DeletionTime openDeletionTime(boolean reversed)
+    {
+        if (!isOpen(reversed))
+            throw new IllegalStateException();
+        return deletion;
+    }
+
+    public DeletionTime closeDeletionTime(boolean reversed)
+    {
+        if (isOpen(reversed))
+            throw new IllegalStateException();
+        return deletion;
+    }
+
+    public void copyTo(RangeTombstoneMarker.Writer writer)
+    {
+        copyBoundTo(writer);
+        writer.writeBoundDeletion(deletion);
+        writer.endOfMarker();
+    }
+
+    public void digest(MessageDigest digest)
+    {
+        bound.digest(digest);
+        deletion.digest(digest);
+    }
+
+    public String toString(CFMetaData metadata)
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Marker ");
+        sb.append(bound.toString(metadata));
+        sb.append("@").append(deletion.markedForDeleteAt());
+        return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object other)
+    {
+        if(!(other instanceof RangeTombstoneBoundMarker))
+            return false;
+
+        RangeTombstoneBoundMarker that = (RangeTombstoneBoundMarker)other;
+        return this.bound.equals(that.bound)
+            && this.deletion.equals(that.deletion);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(bound, deletion);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
new file mode 100644
index 0000000..1140d40
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.Objects;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+
+/**
+ * A range tombstone marker that represents a boundary between 2 range tombstones (i.e. it closes one range and open another).
+ */
+public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker
+{
+    private final DeletionTime endDeletion;
+    private final DeletionTime startDeletion;
+
+    public RangeTombstoneBoundaryMarker(RangeTombstone.Bound bound, DeletionTime endDeletion, DeletionTime startDeletion)
+    {
+        super(bound);
+        assert bound.kind().isBoundary();
+        this.endDeletion = endDeletion;
+        this.startDeletion = startDeletion;
+    }
+
+    public static RangeTombstoneBoundaryMarker exclusiveCloseInclusiveOpen(boolean reversed, ByteBuffer[] boundValues, DeletionTime closeDeletion, DeletionTime openDeletion)
+    {
+        RangeTombstone.Bound bound = RangeTombstone.Bound.exclusiveCloseInclusiveOpen(reversed, boundValues);
+        DeletionTime endDeletion = reversed ? openDeletion : closeDeletion;
+        DeletionTime startDeletion = reversed ? closeDeletion : openDeletion;
+        return new RangeTombstoneBoundaryMarker(bound, endDeletion, startDeletion);
+    }
+
+    public static RangeTombstoneBoundaryMarker inclusiveCloseExclusiveOpen(boolean reversed, ByteBuffer[] boundValues, DeletionTime closeDeletion, DeletionTime openDeletion)
+    {
+        RangeTombstone.Bound bound = RangeTombstone.Bound.inclusiveCloseExclusiveOpen(reversed, boundValues);
+        DeletionTime endDeletion = reversed ? openDeletion : closeDeletion;
+        DeletionTime startDeletion = reversed ? closeDeletion : openDeletion;
+        return new RangeTombstoneBoundaryMarker(bound, endDeletion, startDeletion);
+    }
+
+    public boolean isBoundary()
+    {
+        return true;
+    }
+
+    /**
+     * The deletion time for the range tombstone this boundary ends (in clustering order).
+     */
+    public DeletionTime endDeletionTime()
+    {
+        return endDeletion;
+    }
+
+    /**
+     * The deletion time for the range tombstone this boundary starts (in clustering order).
+     */
+    public DeletionTime startDeletionTime()
+    {
+        return startDeletion;
+    }
+
+    public DeletionTime closeDeletionTime(boolean reversed)
+    {
+        return reversed ? startDeletion : endDeletion;
+    }
+
+    public DeletionTime openDeletionTime(boolean reversed)
+    {
+        return reversed ? endDeletion : startDeletion;
+    }
+
+    public boolean isOpen(boolean reversed)
+    {
+        // A boundary always open one side
+        return true;
+    }
+
+    public boolean isClose(boolean reversed)
+    {
+        // A boundary always close one side
+        return true;
+    }
+
+    public static boolean isBoundary(ClusteringComparator comparator, Slice.Bound close, Slice.Bound open)
+    {
+        if (!comparator.isOnSameClustering(close, open))
+            return false;
+
+        // If both bound are exclusive, then it's not a boundary, otherwise it is one.
+        // Note that most code should never call this with 2 inclusive bound: this would mean we had
+        // 2 RTs that were overlapping and RangeTombstoneList don't create that. However, old
+        // code was generating that so supporting this case helps dealing with backward compatibility.
+        return close.isInclusive() || open.isInclusive();
+    }
+
+    // Please note that isBoundary *must* have been called (and returned true) before this is called.
+    public static RangeTombstoneBoundaryMarker makeBoundary(boolean reversed, Slice.Bound close, Slice.Bound open, DeletionTime closeDeletion, DeletionTime openDeletion)
+    {
+        boolean isExclusiveClose = close.isExclusive() || (close.isInclusive() && open.isInclusive() && openDeletion.supersedes(closeDeletion));
+        return isExclusiveClose
+             ? exclusiveCloseInclusiveOpen(reversed, close.getRawValues(), closeDeletion, openDeletion)
+             : inclusiveCloseExclusiveOpen(reversed, close.getRawValues(), closeDeletion, openDeletion);
+    }
+
+    public RangeTombstoneBoundMarker createCorrespondingCloseBound(boolean reversed)
+    {
+        return new RangeTombstoneBoundMarker(bound.withNewKind(bound.kind().closeBoundOfBoundary(reversed)), endDeletion);
+    }
+
+    public RangeTombstoneBoundMarker createCorrespondingOpenBound(boolean reversed)
+    {
+        return new RangeTombstoneBoundMarker(bound.withNewKind(bound.kind().openBoundOfBoundary(reversed)), startDeletion);
+    }
+
+    public void copyTo(RangeTombstoneMarker.Writer writer)
+    {
+        copyBoundTo(writer);
+        writer.writeBoundaryDeletion(endDeletion, startDeletion);
+        writer.endOfMarker();
+    }
+
+    public void digest(MessageDigest digest)
+    {
+        bound.digest(digest);
+        endDeletion.digest(digest);
+        startDeletion.digest(digest);
+    }
+
+    public String toString(CFMetaData metadata)
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Marker ");
+        sb.append(bound.toString(metadata));
+        sb.append("@").append(endDeletion.markedForDeleteAt()).append("-").append(startDeletion.markedForDeleteAt());
+        return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object other)
+    {
+        if(!(other instanceof RangeTombstoneBoundaryMarker))
+            return false;
+
+        RangeTombstoneBoundaryMarker that = (RangeTombstoneBoundaryMarker)other;
+        return this.bound.equals(that.bound)
+            && this.endDeletion.equals(that.endDeletion)
+            && this.startDeletion.equals(that.startDeletion);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(bound, endDeletion, startDeletion);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
new file mode 100644
index 0000000..1a506d5
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+
+/**
+ * A marker for a range tombstone bound.
+ * <p>
+ * There is 2 types of markers: bounds (see {@link RangeTombstoneBound}) and boundaries (see {@link RangeTombstoneBoundary}).
+ */
+public interface RangeTombstoneMarker extends Unfiltered
+{
+    @Override
+    public RangeTombstone.Bound clustering();
+
+    public boolean isBoundary();
+
+    public void copyTo(RangeTombstoneMarker.Writer writer);
+
+    public boolean isOpen(boolean reversed);
+    public boolean isClose(boolean reversed);
+    public DeletionTime openDeletionTime(boolean reversed);
+    public DeletionTime closeDeletionTime(boolean reversed);
+
+    public interface Writer extends Slice.Bound.Writer
+    {
+        public void writeBoundDeletion(DeletionTime deletion);
+        public void writeBoundaryDeletion(DeletionTime endDeletion, DeletionTime startDeletion);
+        public void endOfMarker();
+    }
+
+    public static class Builder implements Writer
+    {
+        private final ByteBuffer[] values;
+        private int size;
+
+        private RangeTombstone.Bound.Kind kind;
+        private DeletionTime firstDeletion;
+        private DeletionTime secondDeletion;
+
+        public Builder(int maxClusteringSize)
+        {
+            this.values = new ByteBuffer[maxClusteringSize];
+        }
+
+        public void writeClusteringValue(ByteBuffer value)
+        {
+            values[size++] = value;
+        }
+
+        public void writeBoundKind(RangeTombstone.Bound.Kind kind)
+        {
+            this.kind = kind;
+        }
+
+        public void writeBoundDeletion(DeletionTime deletion)
+        {
+            firstDeletion = deletion;
+        }
+
+        public void writeBoundaryDeletion(DeletionTime endDeletion, DeletionTime startDeletion)
+        {
+            firstDeletion = endDeletion;
+            secondDeletion = startDeletion;
+        }
+
+        public void endOfMarker()
+        {
+        }
+
+        public RangeTombstoneMarker build()
+        {
+            assert kind != null : "Nothing has been written";
+            if (kind.isBoundary())
+                return new RangeTombstoneBoundaryMarker(new RangeTombstone.Bound(kind, Arrays.copyOfRange(values, 0, size)), firstDeletion, secondDeletion);
+            else
+                return new RangeTombstoneBoundMarker(new RangeTombstone.Bound(kind, Arrays.copyOfRange(values, 0, size)), firstDeletion);
+        }
+
+        public Builder reset()
+        {
+            Arrays.fill(values, null);
+            size = 0;
+            kind = null;
+            return this;
+        }
+    }
+
+    /**
+     * Utility class to help merging range tombstone markers coming from multiple inputs (UnfilteredRowIterators).
+     * <p>
+     * The assumption that each individual input must validate and that we must preserve in the output is that every
+     * open marker has a corresponding close marker with the exact same deletion info, and that there is no other range
+     * tombstone marker between those open and close marker (of course, they could be rows in between). In other word,
+     * for any {@code UnfilteredRowIterator}, you only ever have to remenber the last open marker (if any) to have the
+     * full picture of what is deleted by range tombstones at any given point of iterating that iterator.
+     * <p>
+     * Note that this class can merge both forward and reverse iterators. To deal with reverse, we just reverse how we
+     * deal with open and close markers (in forward order, we'll get open-close, open-close, ..., while in reverse we'll
+     * get close-open, close-open, ...).
+     */
+    public static class Merger
+    {
+        // Boundaries sorts like the bound that have their equivalent "inclusive" part and that's the main action we
+        // care about as far as merging goes. So MergedKind just group those as the same case, and tell us whether
+        // we're dealing with an open or a close (based on whether we're dealing with reversed iterators or not).
+        // Really this enum is just a convenience for merging.
+        private enum MergedKind
+        {
+            INCL_OPEN, EXCL_CLOSE, EXCL_OPEN, INCL_CLOSE;
+
+            public static MergedKind forBound(RangeTombstone.Bound bound, boolean reversed)
+            {
+                switch (bound.kind())
+                {
+                    case INCL_START_BOUND:
+                    case EXCL_END_INCL_START_BOUNDARY:
+                        return reversed ? INCL_CLOSE : INCL_OPEN;
+                    case EXCL_END_BOUND:
+                        return reversed ? EXCL_OPEN : EXCL_CLOSE;
+                    case EXCL_START_BOUND:
+                        return reversed ? EXCL_CLOSE : EXCL_OPEN;
+                    case INCL_END_EXCL_START_BOUNDARY:
+                    case INCL_END_BOUND:
+                        return reversed ? INCL_OPEN : INCL_CLOSE;
+                }
+                throw new AssertionError();
+            }
+        }
+
+        private final CFMetaData metadata;
+        private final UnfilteredRowIterators.MergeListener listener;
+        private final DeletionTime partitionDeletion;
+        private final boolean reversed;
+
+        private RangeTombstone.Bound bound;
+        private final RangeTombstoneMarker[] markers;
+
+        // For each iterator, what is the currently open marker deletion time (or null if there is no open marker on that iterator)
+        private final DeletionTime[] openMarkers;
+        // The index in openMarkers of the "biggest" marker, the one with the biggest deletion time. Is < 0 iff there is no open
+        // marker on any iterator.
+        private int biggestOpenMarker = -1;
+
+        public Merger(CFMetaData metadata, int size, DeletionTime partitionDeletion, boolean reversed, UnfilteredRowIterators.MergeListener listener)
+        {
+            this.metadata = metadata;
+            this.listener = listener;
+            this.partitionDeletion = partitionDeletion;
+            this.reversed = reversed;
+
+            this.markers = new RangeTombstoneMarker[size];
+            this.openMarkers = new DeletionTime[size];
+        }
+
+        public void clear()
+        {
+            Arrays.fill(markers, null);
+        }
+
+        public void add(int i, RangeTombstoneMarker marker)
+        {
+            bound = marker.clustering();
+            markers[i] = marker;
+        }
+
+        public RangeTombstoneMarker merge()
+        {
+            /*
+             * Merging of range tombstones works this way:
+             *   1) We remember what is the currently open marker in the merged stream
+             *   2) We update our internal states of what range is opened on the input streams based on the new markers to merge
+             *   3) We compute what should be the state in the merge stream after 2)
+             *   4) We return what marker should be issued on the merged stream based on the difference between the state from 1) and 3)
+             */
+
+            DeletionTime previousDeletionTimeInMerged = currentOpenDeletionTimeInMerged();
+
+            updateOpenMarkers();
+
+            DeletionTime newDeletionTimeInMerged = currentOpenDeletionTimeInMerged();
+            if (previousDeletionTimeInMerged.equals(newDeletionTimeInMerged))
+                return null;
+
+            ByteBuffer[] values = bound.getRawValues();
+
+            RangeTombstoneMarker merged;
+            switch (MergedKind.forBound(bound, reversed))
+            {
+                case INCL_OPEN:
+                    merged = previousDeletionTimeInMerged.isLive()
+                           ? RangeTombstoneBoundMarker.inclusiveOpen(reversed, values, newDeletionTimeInMerged)
+                           : RangeTombstoneBoundaryMarker.exclusiveCloseInclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged);
+                    break;
+                case EXCL_CLOSE:
+                    merged = newDeletionTimeInMerged.isLive()
+                           ? RangeTombstoneBoundMarker.exclusiveClose(reversed, values, previousDeletionTimeInMerged)
+                           : RangeTombstoneBoundaryMarker.exclusiveCloseInclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged);
+                    break;
+                case EXCL_OPEN:
+                    merged = previousDeletionTimeInMerged.isLive()
+                           ? RangeTombstoneBoundMarker.exclusiveOpen(reversed, values, newDeletionTimeInMerged)
+                           : RangeTombstoneBoundaryMarker.inclusiveCloseExclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged);
+                    break;
+                case INCL_CLOSE:
+                    merged = newDeletionTimeInMerged.isLive()
+                           ? RangeTombstoneBoundMarker.inclusiveClose(reversed, values, previousDeletionTimeInMerged)
+                           : RangeTombstoneBoundaryMarker.inclusiveCloseExclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged);
+                    break;
+                default:
+                    throw new AssertionError();
+            }
+
+            if (listener != null)
+                listener.onMergedRangeTombstoneMarkers(merged, markers);
+
+            return merged;
+        }
+
+        private DeletionTime currentOpenDeletionTimeInMerged()
+        {
+            if (biggestOpenMarker < 0)
+                return DeletionTime.LIVE;
+
+            DeletionTime biggestDeletionTime = openMarkers[biggestOpenMarker];
+            // it's only open in the merged iterator if it's not shadowed by the partition level deletion
+            return partitionDeletion.supersedes(biggestDeletionTime) ? DeletionTime.LIVE : biggestDeletionTime.takeAlias();
+        }
+
+        private void updateOpenMarkers()
+        {
+            for (int i = 0; i < markers.length; i++)
+            {
+                RangeTombstoneMarker marker = markers[i];
+                if (marker == null)
+                    continue;
+
+                // Note that we can have boundaries that are both open and close, but in that case all we care about
+                // is what it the open deletion after the marker, so we favor the opening part in this case.
+                if (marker.isOpen(reversed))
+                    openMarkers[i] = marker.openDeletionTime(reversed).takeAlias();
+                else
+                    openMarkers[i] = null;
+            }
+
+            // Recompute what is now the biggest open marker
+            biggestOpenMarker = -1;
+            for (int i = 0; i < openMarkers.length; i++)
+            {
+                if (openMarkers[i] != null && (biggestOpenMarker < 0 || openMarkers[i].supersedes(openMarkers[biggestOpenMarker])))
+                    biggestOpenMarker = i;
+            }
+        }
+
+        public DeletionTime activeDeletion()
+        {
+            DeletionTime openMarker = currentOpenDeletionTimeInMerged();
+            // We only have an open marker in the merged stream if it's not shadowed by the partition deletion (which can be LIVE itself), so
+            // if have an open marker, we know it's the "active" deletion for the merged stream.
+            return openMarker.isLive() ? partitionDeletion : openMarker;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/ReusableRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ReusableRow.java b/src/java/org/apache/cassandra/db/rows/ReusableRow.java
new file mode 100644
index 0000000..0135afc
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/ReusableRow.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.*;
+
+public class ReusableRow extends AbstractReusableRow
+{
+    private final ReusableClustering clustering;
+
+    private final ReusableLivenessInfo liveness = new ReusableLivenessInfo();
+
+    private DeletionTime deletion = DeletionTime.LIVE;
+
+    private final RowDataBlock data;
+    private final Writer writer;
+
+    public ReusableRow(int clusteringSize, Columns columns, boolean inOrderCells, boolean isCounter)
+    {
+        this.clustering = new ReusableClustering(clusteringSize);
+        this.data = new RowDataBlock(columns, 1, false, isCounter);
+        this.writer = new Writer(data, inOrderCells);
+    }
+
+    protected RowDataBlock data()
+    {
+        return data;
+    }
+
+    protected int row()
+    {
+        return 0;
+    }
+
+    public Clustering clustering()
+    {
+        return clustering;
+    }
+
+    public LivenessInfo primaryKeyLivenessInfo()
+    {
+        return liveness;
+    }
+
+    public DeletionTime deletion()
+    {
+        return deletion;
+    }
+
+    public Row.Writer writer()
+    {
+        return writer.reset();
+    }
+
+    private class Writer extends RowDataBlock.Writer
+    {
+        public Writer(RowDataBlock data, boolean inOrderCells)
+        {
+            super(data, inOrderCells);
+        }
+
+        public void writeClusteringValue(ByteBuffer buffer)
+        {
+            clustering.writer().writeClusteringValue(buffer);
+        }
+
+        public void writePartitionKeyLivenessInfo(LivenessInfo info)
+        {
+            ReusableRow.this.liveness.setTo(info);
+        }
+
+        public void writeRowDeletion(DeletionTime deletion)
+        {
+            ReusableRow.this.deletion = deletion;
+        }
+
+        @Override
+        public Writer reset()
+        {
+            super.reset();
+            clustering.reset();
+            liveness.reset();
+            deletion = DeletionTime.LIVE;
+            return this;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
new file mode 100644
index 0000000..545da7a
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -0,0 +1,555 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.MergeIterator;
+import org.apache.cassandra.utils.SearchIterator;
+
+/**
+ * Storage engine representation of a row.
+ *
+ * A row is identified by it's clustering column values (it's an Unfiltered),
+ * has row level informations (deletion and partition key liveness infos (see below))
+ * and contains data (Cells) regarding the columns it contains.
+ *
+ * A row implements {@code WithLivenessInfo} and has thus a timestamp, ttl and
+ * local deletion time. Those information do not apply to the row content, they
+ * apply to the partition key columns. In other words, the timestamp is the
+ * timestamp for the partition key columns: it is what allows to distinguish
+ * between a dead row, and a live row but for which only the partition key columns
+ * are set. The ttl and local deletion time information are for the case where
+ * a TTL is set on those partition key columns. Note however that a row can have
+ * live cells but no partition key columns timestamp, because said timestamp (and
+ * its corresponding ttl) is only set on INSERT (not UPDATE).
+ */
+public interface Row extends Unfiltered, Iterable<Cell>, Aliasable<Row>
+{
+    /**
+     * The clustering values for this row.
+     */
+    @Override
+    public Clustering clustering();
+
+    /**
+     * The columns this row contains.
+     *
+     * Note that this is actually a superset of the columns the row contains. The row
+     * may not have values for each of those columns, but it can't have values for other
+     * columns.
+     *
+     * @return a superset of the columns contained in this row.
+     */
+    public Columns columns();
+
+    /**
+     * The row deletion.
+     *
+     * This correspond to the last row deletion done on this row.
+     *
+     * @return the row deletion.
+     */
+    public DeletionTime deletion();
+
+    /**
+     * Liveness information for the primary key columns of this row.
+     * <p>
+     * As a row is uniquely identified by its primary key, all its primary key columns
+     * share the same {@code LivenessInfo}. This liveness information is what allows us
+     * to distinguish between a dead row (it has no live cells and its primary key liveness
+     * info has no timestamp) and a live row but where all non PK columns are null (it has no
+     * live cells, but its primary key liveness has a timestamp). Please note that the ttl
+     * (and local deletion time) of the PK liveness information only apply to the
+     * liveness info timestamp, and not to the content of the row. Also note that because
+     * in practice there is not way to only delete the primary key columns (without deleting
+     * the row itself), the returned {@code LivenessInfo} can only have a local deletion time
+     * if it has a TTL.
+     * <p>
+     * Lastly, note that it is possible for a row to have live cells but no PK liveness
+     * info timestamp, because said timestamp is only set on {@code INSERT} (which makes sense
+     * in itself, see #6782) but live cells can be add through {@code UPDATE} even if the row
+     * wasn't pre-existing (which users are encouraged not to do, but we can't validate).
+     */
+    public LivenessInfo primaryKeyLivenessInfo();
+
+    /**
+     * Whether the row correspond to a static row or not.
+     *
+     * @return whether the row correspond to a static row or not.
+     */
+    public boolean isStatic();
+
+    /**
+     * Whether the row has no information whatsoever. This means no row infos
+     * (timestamp, ttl, deletion), no cells and no complex deletion info.
+     *
+     * @return {@code true} if the row has no data whatsoever, {@code false} otherwise.
+     */
+    public boolean isEmpty();
+
+    /**
+     * Whether the row has some live information (i.e. it's not just deletion informations).
+     */
+    public boolean hasLiveData(int nowInSec);
+
+    /**
+     * Whether or not this row contains any deletion for a complex column. That is if
+     * there is at least one column for which {@code getDeletion} returns a non
+     * live deletion time.
+     */
+    public boolean hasComplexDeletion();
+
+    /**
+     * Returns a cell for a simple column.
+     *
+     * Calls to this method are allowed to return the same Cell object, and hence the returned
+     * object is only valid until the next getCell/getCells call on the same Row object. You will need
+     * to copy the returned data if you plan on using a reference to the Cell object
+     * longer than that.
+     *
+     * @param c the simple column for which to fetch the cell.
+     * @return the corresponding cell or {@code null} if the row has no such cell.
+     */
+    public Cell getCell(ColumnDefinition c);
+
+    /**
+     * Return a cell for a given complex column and cell path.
+     *
+     * Calls to this method are allowed to return the same Cell object, and hence the returned
+     * object is only valid until the next getCell/getCells call on the same Row object. You will need
+     * to copy the returned data if you plan on using a reference to the Cell object
+     * longer than that.
+     *
+     * @param c the complex column for which to fetch the cell.
+     * @param path the cell path for which to fetch the cell.
+     * @return the corresponding cell or {@code null} if the row has no such cell.
+     */
+    public Cell getCell(ColumnDefinition c, CellPath path);
+
+    /**
+     * Returns an iterator on the cells of a complex column c.
+     *
+     * Calls to this method are allowed to return the same iterator object, and
+     * hence the returned object is only valid until the next getCell/getCells call
+     * on the same Row object. You will need to copy the returned data if you
+     * plan on using a reference to the Cell object longer than that.
+     *
+     * @param c the complex column for which to fetch the cells.
+     * @return an iterator on the cells of complex column {@code c} or {@code null} if the row has no
+     * cells for that column.
+     */
+    public Iterator<Cell> getCells(ColumnDefinition c);
+
+    /**
+     * Deletion informations for complex columns.
+     *
+     * @param c the complex column for which to fetch deletion info.
+     * @return the deletion time for complex column {@code c} in this row.
+     */
+    public DeletionTime getDeletion(ColumnDefinition c);
+
+    /**
+     * An iterator over the cells of this row.
+     *
+     * The iterator guarantees that for 2 rows of the same partition, columns
+     * are returned in a consistent order in the sense that if the cells for
+     * column c1 is returned before the cells for column c2 by the first iterator,
+     * it is also the case for the 2nd iterator.
+     *
+     * The object returned by a call to next() is only guaranteed to be valid until
+     * the next call to hasNext() or next(). If a consumer wants to keep a
+     * reference on the returned Cell objects for longer than the iteration, it must
+     * make a copy of it explicitly.
+     *
+     * @return an iterator over the cells of this row.
+     */
+    public Iterator<Cell> iterator();
+
+    /**
+     * An iterator to efficiently search data for a given column.
+     *
+     * @return a search iterator for the cells of this row.
+     */
+    public SearchIterator<ColumnDefinition, ColumnData> searchIterator();
+
+    /**
+     * Copy this row to the provided writer.
+     *
+     * @param writer the row writer to write this row to.
+     */
+    public void copyTo(Row.Writer writer);
+
+    public String toString(CFMetaData metadata, boolean fullDetails);
+
+    /**
+     * Interface for writing a row.
+     * <p>
+     * Clients of this interface should abid to the following assumptions:
+     *   1) if the row has a non empty clustering (it's not a static one and it doesn't belong to a table without
+     *      clustering columns), then that clustering should be the first thing written (through
+     *      {@link ClusteringPrefix.Writer#writeClusteringValue})).
+     *   2) for a given complex column, calls to {@link #writeCell} are performed consecutively (without
+     *      any call to {@code writeCell} for another column intermingled) and in {@code CellPath} order.
+     *   3) {@link #endOfRow} is always called to end the writing of a given row.
+     */
+    public interface Writer extends ClusteringPrefix.Writer
+    {
+        /**
+         * Writes the livness information for the partition key columns of this row.
+         *
+         * This call is optional: skipping it is equivalent to calling {@code writePartitionKeyLivenessInfo(LivenessInfo.NONE)}.
+         *
+         * @param info the liveness information for the partition key columns of the written row.
+         */
+        public void writePartitionKeyLivenessInfo(LivenessInfo info);
+
+        /**
+         * Writes the deletion information for this row.
+         *
+         * This call is optional and can be skipped if the row is not deleted.
+         *
+         * @param deletion the row deletion time, or {@code DeletionTime.LIVE} if the row isn't deleted.
+         */
+        public void writeRowDeletion(DeletionTime deletion);
+
+        /**
+         * Writes a cell to the writer.
+         *
+         * As mentionned above, add cells for a given column should be added consecutively (and in {@code CellPath} order for complex columns).
+         *
+         * @param column the column for the written cell.
+         * @param isCounter whether or not this is a counter cell.
+         * @param value the value for the cell. For tombstones, which don't have values, this should be an empty buffer.
+         * @param info the cell liveness information.
+         * @param path the {@link CellPath} for complex cells and {@code null} for regular cells.
+         */
+        public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path);
+
+        /**
+         * Writes a deletion for a complex column, that is one that apply to all cells of the complex column.
+         *
+         * @param column the (complex) column this is a deletion for.
+         * @param complexDeletion the deletion time.
+         */
+        public void writeComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion);
+
+        /**
+         * Should be called to indicates that the row has been fully written.
+         */
+        public void endOfRow();
+    }
+
+    /**
+     * Utility class to help merging rows from multiple inputs (UnfilteredRowIterators).
+     */
+    public abstract static class Merger
+    {
+        private final CFMetaData metadata;
+        private final int nowInSec;
+        private final UnfilteredRowIterators.MergeListener listener;
+        private final Columns columns;
+
+        private Clustering clustering;
+        private final Row[] rows;
+        private int rowsToMerge;
+
+        private LivenessInfo rowInfo = LivenessInfo.NONE;
+        private DeletionTime rowDeletion = DeletionTime.LIVE;
+
+        private final Cell[] cells;
+        private final List<Iterator<Cell>> complexCells;
+        private final ComplexColumnReducer complexReducer = new ComplexColumnReducer();
+
+        // For the sake of the listener if there is one
+        private final DeletionTime[] complexDelTimes;
+
+        private boolean signaledListenerForRow;
+
+        public static Merger createStatic(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener)
+        {
+            return new StaticMerger(metadata, size, nowInSec, columns, listener);
+        }
+
+        public static Merger createRegular(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener)
+        {
+            return new RegularMerger(metadata, size, nowInSec, columns, listener);
+        }
+
+        protected Merger(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener)
+        {
+            this.metadata = metadata;
+            this.nowInSec = nowInSec;
+            this.listener = listener;
+            this.columns = columns;
+            this.rows = new Row[size];
+            this.complexCells = new ArrayList<>(size);
+
+            this.cells = new Cell[size];
+            this.complexDelTimes = listener == null ? null : new DeletionTime[size];
+        }
+
+        public void clear()
+        {
+            Arrays.fill(rows, null);
+            Arrays.fill(cells, null);
+            if (complexDelTimes != null)
+                Arrays.fill(complexDelTimes, null);
+            complexCells.clear();
+            rowsToMerge = 0;
+
+            rowInfo = LivenessInfo.NONE;
+            rowDeletion = DeletionTime.LIVE;
+
+            signaledListenerForRow = false;
+        }
+
+        public void add(int i, Row row)
+        {
+            clustering = row.clustering();
+            rows[i] = row;
+            ++rowsToMerge;
+        }
+
+        protected abstract Row.Writer getWriter();
+        protected abstract Row getRow();
+
+        public Row merge(DeletionTime activeDeletion)
+        {
+            // If for this clustering we have only one row version and have no activeDeletion (i.e. nothing to filter out),
+            // then we can just return that single row (we also should have no listener)
+            if (rowsToMerge == 1 && activeDeletion.isLive() && listener == null)
+            {
+                for (int i = 0; i < rows.length; i++)
+                    if (rows[i] != null)
+                        return rows[i];
+                throw new AssertionError();
+            }
+
+            Row.Writer writer = getWriter();
+            Rows.writeClustering(clustering, writer);
+
+            for (int i = 0; i < rows.length; i++)
+            {
+                if (rows[i] == null)
+                    continue;
+
+                rowInfo = rowInfo.mergeWith(rows[i].primaryKeyLivenessInfo());
+
+                if (rows[i].deletion().supersedes(rowDeletion))
+                    rowDeletion = rows[i].deletion();
+            }
+
+            if (rowDeletion.supersedes(activeDeletion))
+                activeDeletion = rowDeletion;
+
+            if (activeDeletion.deletes(rowInfo))
+                rowInfo = LivenessInfo.NONE;
+
+            writer.writePartitionKeyLivenessInfo(rowInfo);
+            writer.writeRowDeletion(rowDeletion);
+
+            for (int i = 0; i < columns.simpleColumnCount(); i++)
+            {
+                ColumnDefinition c = columns.getSimple(i);
+                for (int j = 0; j < rows.length; j++)
+                    cells[j] = rows[j] == null ? null : rows[j].getCell(c);
+
+                reconcileCells(activeDeletion, writer);
+            }
+
+            complexReducer.activeDeletion = activeDeletion;
+            complexReducer.writer = writer;
+            for (int i = 0; i < columns.complexColumnCount(); i++)
+            {
+                ColumnDefinition c = columns.getComplex(i);
+
+                DeletionTime maxComplexDeletion = DeletionTime.LIVE;
+                for (int j = 0; j < rows.length; j++)
+                {
+                    if (rows[j] == null)
+                        continue;
+
+                    DeletionTime dt = rows[j].getDeletion(c);
+                    if (complexDelTimes != null)
+                        complexDelTimes[j] = dt;
+
+                    if (dt.supersedes(maxComplexDeletion))
+                        maxComplexDeletion = dt;
+                }
+
+                boolean overrideActive = maxComplexDeletion.supersedes(activeDeletion);
+                maxComplexDeletion =  overrideActive ? maxComplexDeletion : DeletionTime.LIVE;
+                writer.writeComplexDeletion(c, maxComplexDeletion);
+                if (listener != null)
+                    listener.onMergedComplexDeletion(c, maxComplexDeletion, complexDelTimes);
+
+                mergeComplex(overrideActive ? maxComplexDeletion : activeDeletion, c);
+            }
+            writer.endOfRow();
+
+            Row row = getRow();
+            // Because shadowed cells are skipped, the row could be empty. In which case
+            // we return null (we also don't want to signal anything in that case since that
+            // means everything in the row was shadowed and the listener will have been signalled
+            // for whatever shadows it).
+            if (row.isEmpty())
+                return null;
+
+            maybeSignalEndOfRow();
+            return row;
+        }
+
+        private void maybeSignalListenerForRow()
+        {
+            if (listener != null && !signaledListenerForRow)
+            {
+                listener.onMergingRows(clustering, rowInfo, rowDeletion, rows);
+                signaledListenerForRow = true;
+            }
+        }
+
+        private void maybeSignalListenerForCell(Cell merged, Cell[] versions)
+        {
+            if (listener != null)
+            {
+                maybeSignalListenerForRow();
+                listener.onMergedCells(merged, versions);
+            }
+        }
+
+        private void maybeSignalEndOfRow()
+        {
+            if (listener != null)
+            {
+                // If we haven't signaled the listener yet (we had no cells but some deletion info), do it now
+                maybeSignalListenerForRow();
+                listener.onRowDone();
+            }
+        }
+
+        private void reconcileCells(DeletionTime activeDeletion, Row.Writer writer)
+        {
+            Cell reconciled = null;
+            for (int j = 0; j < cells.length; j++)
+            {
+                Cell cell = cells[j];
+                if (cell != null && !activeDeletion.deletes(cell.livenessInfo()))
+                    reconciled = Cells.reconcile(reconciled, cell, nowInSec);
+            }
+
+            if (reconciled != null)
+            {
+                reconciled.writeTo(writer);
+                maybeSignalListenerForCell(reconciled, cells);
+            }
+        }
+
+        private void mergeComplex(DeletionTime activeDeletion, ColumnDefinition c)
+        {
+            complexCells.clear();
+            for (int j = 0; j < rows.length; j++)
+            {
+                Row row = rows[j];
+                Iterator<Cell> iter = row == null ? null : row.getCells(c);
+                complexCells.add(iter == null ? Iterators.<Cell>emptyIterator() : iter);
+            }
+
+            complexReducer.column = c;
+            complexReducer.activeDeletion = activeDeletion;
+
+            // Note that we use the mergeIterator only to group cells to merge, but we
+            // write the result to the writer directly in the reducer, so all we care
+            // about is iterating over the result.
+            Iterator<Void> iter = MergeIterator.get(complexCells, c.cellComparator(), complexReducer);
+            while (iter.hasNext())
+                iter.next();
+        }
+
+        private class ComplexColumnReducer extends MergeIterator.Reducer<Cell, Void>
+        {
+            private DeletionTime activeDeletion;
+            private Row.Writer writer;
+            private ColumnDefinition column;
+
+            public void reduce(int idx, Cell current)
+            {
+                cells[idx] = current;
+            }
+
+            protected Void getReduced()
+            {
+                reconcileCells(activeDeletion, writer);
+                return null;
+            }
+
+            protected void onKeyChange()
+            {
+                Arrays.fill(cells, null);
+            }
+        }
+
+        private static class StaticMerger extends Merger
+        {
+            private final StaticRow.Builder builder;
+
+            private StaticMerger(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener)
+            {
+                super(metadata, size, nowInSec, columns, listener);
+                this.builder = StaticRow.builder(columns, true, metadata.isCounter());
+            }
+
+            protected Row.Writer getWriter()
+            {
+                return builder;
+            }
+
+            protected Row getRow()
+            {
+                return builder.build();
+            }
+        }
+
+        private static class RegularMerger extends Merger
+        {
+            private final ReusableRow row;
+
+            private RegularMerger(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener)
+            {
+                super(metadata, size, nowInSec, columns, listener);
+                this.row = new ReusableRow(metadata.clusteringColumns().size(), columns, true, metadata.isCounter());
+            }
+
+            protected Row.Writer getWriter()
+            {
+                return row.writer();
+            }
+
+            protected Row getRow()
+            {
+                return row;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java b/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java
new file mode 100644
index 0000000..51383a2
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.util.Comparator;
+import java.util.Iterator;
+
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.UnmodifiableIterator;
+
+import org.apache.cassandra.db.*;
+
+public class RowAndTombstoneMergeIterator extends UnmodifiableIterator<Unfiltered> implements PeekingIterator<Unfiltered>
+{
+    private final ClusteringComparator clusteringComparator;
+    private final Comparator<Clusterable> comparator;
+    private final boolean reversed;
+
+    private Iterator<Row> rowIter;
+    private Row nextRow;
+
+    private Iterator<RangeTombstone> tombstoneIter;
+    private RangeTombstone nextTombstone;
+    private boolean inTombstone;
+
+    private Unfiltered next;
+
+    public RowAndTombstoneMergeIterator(ClusteringComparator comparator, boolean reversed)
+    {
+        this.clusteringComparator = comparator;
+        this.comparator = reversed ? comparator.reversed() : comparator;
+        this.reversed = reversed;
+    }
+
+    public RowAndTombstoneMergeIterator setTo(Iterator<Row> rowIter, Iterator<RangeTombstone> tombstoneIter)
+    {
+        this.rowIter = rowIter;
+        this.tombstoneIter = tombstoneIter;
+        this.nextRow = null;
+        this.nextTombstone = null;
+        this.next = null;
+        this.inTombstone = false;
+        return this;
+    }
+
+    public boolean isSet()
+    {
+        return rowIter != null;
+    }
+
+    private void prepareNext()
+    {
+        if (next != null)
+            return;
+
+        if (nextTombstone == null && tombstoneIter.hasNext())
+            nextTombstone = tombstoneIter.next();
+        if (nextRow == null && rowIter.hasNext())
+            nextRow = rowIter.next();
+
+        if (nextTombstone == null)
+        {
+            if (nextRow == null)
+                return;
+
+            next = nextRow;
+            nextRow = null;
+        }
+        else if (nextRow == null)
+        {
+            if (inTombstone)
+            {
+                RangeTombstone rt = nextTombstone;
+                nextTombstone = tombstoneIter.hasNext() ? tombstoneIter.next() : null;
+                if (nextTombstone != null && RangeTombstoneBoundaryMarker.isBoundary(clusteringComparator, rt.deletedSlice().close(reversed), nextTombstone.deletedSlice().open(reversed)))
+                {
+                    next = RangeTombstoneBoundaryMarker.makeBoundary(reversed,
+                                                                     rt.deletedSlice().close(reversed),
+                                                                     nextTombstone.deletedSlice().open(reversed),
+                                                                     rt.deletionTime(),
+                                                                     nextTombstone.deletionTime());
+                }
+                else
+                {
+                    inTombstone = false;
+                    next = new RangeTombstoneBoundMarker(rt.deletedSlice().close(reversed), rt.deletionTime());
+                }
+            }
+            else
+            {
+                inTombstone = true;
+                next = new RangeTombstoneBoundMarker(nextTombstone.deletedSlice().open(reversed), nextTombstone.deletionTime());
+            }
+        }
+        else if (inTombstone)
+        {
+            if (comparator.compare(nextTombstone.deletedSlice().close(reversed), nextRow.clustering()) < 0)
+            {
+                RangeTombstone rt = nextTombstone;
+                nextTombstone = tombstoneIter.hasNext() ? tombstoneIter.next() : null;
+                if (nextTombstone != null && RangeTombstoneBoundaryMarker.isBoundary(clusteringComparator, rt.deletedSlice().close(reversed), nextTombstone.deletedSlice().open(reversed)))
+                {
+                    next = RangeTombstoneBoundaryMarker.makeBoundary(reversed,
+                                                                     rt.deletedSlice().close(reversed),
+                                                                     nextTombstone.deletedSlice().open(reversed),
+                                                                     rt.deletionTime(),
+                                                                     nextTombstone.deletionTime());
+                }
+                else
+                {
+                    inTombstone = false;
+                    next = new RangeTombstoneBoundMarker(rt.deletedSlice().close(reversed), rt.deletionTime());
+                }
+            }
+            else
+            {
+                next = nextRow;
+                nextRow = null;
+            }
+        }
+        else
+        {
+            if (comparator.compare(nextTombstone.deletedSlice().open(reversed), nextRow.clustering()) < 0)
+            {
+                inTombstone = true;
+                next = new RangeTombstoneBoundMarker(nextTombstone.deletedSlice().open(reversed), nextTombstone.deletionTime());
+            }
+            else
+            {
+                next = nextRow;
+                nextRow = null;
+            }
+        }
+    }
+
+    public boolean hasNext()
+    {
+        prepareNext();
+        return next != null;
+    }
+
+    public Unfiltered next()
+    {
+        prepareNext();
+        Unfiltered toReturn = next;
+        next = null;
+        return toReturn;
+    }
+
+    public Unfiltered peek()
+    {
+        prepareNext();
+        return next();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/RowDataBlock.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowDataBlock.java b/src/java/org/apache/cassandra/db/rows/RowDataBlock.java
new file mode 100644
index 0000000..b1e2b13
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/RowDataBlock.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.ObjectSizes;
+
+/**
+ * A {@code RowDataBlock} holds data for one or more row (of a given table). More precisely, it contains
+ * cell data  and complex deletion data (for complex columns) and allow access to this data. Please note
+ * however that {@code RowDataBlock} only holds the data inside the row, it does not hold the data
+ * pertaining to the row itself: clustering, partition key liveness info and row deletion.
+ * <p>
+ * {@code RowDataBlock} is largely an implementation detail: it is only there to be reused by
+ * {@link AbstractPartitionData} and every concrete row implementation.
+ */
+public class RowDataBlock
+{
+    private static final Logger logger = LoggerFactory.getLogger(RowDataBlock.class);
+
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new RowDataBlock(Columns.NONE, 0, false, false));
+
+    // We distinguish 2 sub-objects: SimpleRowDataBlock that contains the data for the simple columns only,
+    // and ComplexRowDataBlock that only contains data for complex columns. The reason for having 2 separate
+    // objects is that simple columns are much easier to handle since we have only a single cell per-object
+    // and thus having a more specialized object allow a simpler and more efficient handling.
+    final SimpleRowDataBlock simpleData;
+    final ComplexRowDataBlock complexData;
+
+    public RowDataBlock(Columns columns, int rows, boolean sortable, boolean isCounter)
+    {
+        this.simpleData = columns.hasSimple() ? new SimpleRowDataBlock(columns, rows, isCounter) : null;
+        this.complexData = columns.hasComplex() ? ComplexRowDataBlock.create(columns, rows, sortable, isCounter) : null;
+    }
+
+    public Columns columns()
+    {
+        if (simpleData != null)
+            return simpleData.columns();
+        if (complexData != null)
+            return complexData.columns();
+        return Columns.NONE;
+    }
+
+    /**
+     * Return the cell value for a given column of a given row.
+     *
+     * @param row the row for which to return the cell value.
+     * @param column the column for which to return the cell value.
+     * @param path the cell path for which to return the cell value. Can be null for
+     * simple columns.
+     *
+     * @return the value of the cell of path {@code path} for {@code column} in row {@code row}, or
+     * {@code null} if their is no such cell.
+     */
+    public ByteBuffer getValue(int row, ColumnDefinition column, CellPath path)
+    {
+        if (column.isComplex())
+        {
+            return complexData.getValue(row, column, path);
+        }
+        else
+        {
+            int idx = columns().simpleIdx(column, 0);
+            assert idx >= 0;
+            return simpleData.data.value((row * columns().simpleColumnCount()) + idx);
+        }
+    }
+
+    /**
+     * Sets the cell value for a given simple column of a given row.
+     *
+     * @param row the row for which to set the cell value.
+     * @param column the simple column for which to set the cell value.
+     * @param path the cell path for which to return the cell value. Can be null for
+     * simple columns.
+     * @param value the value to set.
+     */
+    public void setValue(int row, ColumnDefinition column, CellPath path, ByteBuffer value)
+    {
+        if (column.isComplex())
+        {
+            complexData.setValue(row, column, path, value);
+        }
+        else
+        {
+            int idx = columns().simpleIdx(column, 0);
+            assert idx >= 0;
+            simpleData.data.setValue((row * columns().simpleColumnCount()) + idx, value);
+        }
+    }
+
+    public static ReusableIterator reusableIterator()
+    {
+        return new ReusableIterator();
+    }
+
+    // Swap row i and j
+    public void swap(int i, int j)
+    {
+        if (simpleData != null)
+            simpleData.swap(i, j);
+        if (complexData != null)
+            complexData.swap(i, j);
+    }
+
+    // Merge row i into j
+    public void merge(int i, int j, int nowInSec)
+    {
+        if (simpleData != null)
+            simpleData.merge(i, j, nowInSec);
+        if (complexData != null)
+            complexData.merge(i, j, nowInSec);
+    }
+
+    // Move row i into j
+    public void move(int i, int j)
+    {
+        if (simpleData != null)
+            simpleData.move(i, j);
+        if (complexData != null)
+            complexData.move(i, j);
+    }
+
+    public boolean hasComplexDeletion(int row)
+    {
+        return complexData != null && complexData.hasComplexDeletion(row);
+    }
+
+    public long unsharedHeapSizeExcludingData()
+    {
+        return EMPTY_SIZE
+             + (simpleData == null ? 0 : simpleData.unsharedHeapSizeExcludingData())
+             + (complexData == null ? 0 : complexData.unsharedHeapSizeExcludingData());
+    }
+
+    public static int computeNewCapacity(int currentCapacity, int idxToSet)
+    {
+        int newCapacity = currentCapacity == 0 ? 4 : currentCapacity;
+        while (idxToSet >= newCapacity)
+            newCapacity = 1 + (newCapacity * 3) / 2;
+        return newCapacity;
+    }
+
+    public int dataSize()
+    {
+        return (simpleData == null ? 0 : simpleData.dataSize())
+             + (complexData == null ? 0 : complexData.dataSize());
+    }
+
+    public void clear()
+    {
+        if (simpleData != null)
+            simpleData.clear();
+        if (complexData != null)
+            complexData.clear();
+    }
+
+    public abstract static class Writer implements Row.Writer
+    {
+        private final boolean inOrderCells;
+
+        protected int row;
+
+        protected SimpleRowDataBlock.CellWriter simpleWriter;
+        protected ComplexRowDataBlock.CellWriter complexWriter;
+
+        protected Writer(boolean inOrderCells)
+        {
+            this.inOrderCells = inOrderCells;
+        }
+
+        protected Writer(RowDataBlock data, boolean inOrderCells)
+        {
+            this(inOrderCells);
+            updateWriter(data);
+        }
+
+        protected void updateWriter(RowDataBlock data)
+        {
+            this.simpleWriter = data.simpleData == null ? null : data.simpleData.cellWriter(inOrderCells);
+            this.complexWriter = data.complexData == null ? null : data.complexData.cellWriter(inOrderCells);
+        }
+
+        public Writer reset()
+        {
+            row = 0;
+
+            if (simpleWriter != null)
+                simpleWriter.reset();
+            if (complexWriter != null)
+                complexWriter.reset();
+
+            return this;
+        }
+
+        public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path)
+        {
+            if (column.isComplex())
+                complexWriter.addCell(column, value, info, path);
+            else
+                simpleWriter.addCell(column, value, info);
+        }
+
+        public void writeComplexDeletion(ColumnDefinition c, DeletionTime complexDeletion)
+        {
+            if (complexDeletion.isLive())
+                return;
+
+            complexWriter.setComplexDeletion(c, complexDeletion);
+        }
+
+        public void endOfRow()
+        {
+            ++row;
+            if (simpleWriter != null)
+                simpleWriter.endOfRow();
+            if (complexWriter != null)
+                complexWriter.endOfRow();
+        }
+    }
+
+    static class ReusableIterator extends UnmodifiableIterator<Cell> implements Iterator<Cell>
+    {
+        private SimpleRowDataBlock.ReusableIterator simpleIterator;
+        private ComplexRowDataBlock.ReusableIterator complexIterator;
+
+        public ReusableIterator()
+        {
+            this.simpleIterator = SimpleRowDataBlock.reusableIterator();
+            this.complexIterator = ComplexRowDataBlock.reusableIterator();
+        }
+
+        public ReusableIterator setTo(RowDataBlock dataBlock, int row)
+        {
+            simpleIterator.setTo(dataBlock.simpleData, row);
+            complexIterator.setTo(dataBlock.complexData, row);
+            return this;
+        }
+
+        public boolean hasNext()
+        {
+            return simpleIterator.hasNext() || complexIterator.hasNext();
+        }
+
+        public Cell next()
+        {
+            return simpleIterator.hasNext() ? simpleIterator.next() : complexIterator.next();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/RowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowIterator.java b/src/java/org/apache/cassandra/db/rows/RowIterator.java
new file mode 100644
index 0000000..69994dd
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/RowIterator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.util.Iterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+
+/**
+ * An iterator over rows belonging to a partition.
+ *
+ * A RowIterator is an UnfilteredRowIterator to which any deletion information has been
+ * filtered out. As such, all cell of all rows returned by this iterator are,
+ * by definition, live, and hence code using a RowIterator don't have to worry
+ * about tombstones and other deletion information.
+ *
+ * Note that as for UnfilteredRowIterator, the rows returned must be in clustering order (or
+ * reverse clustering order if isReverseOrder is true), and the Row objects returned
+ * by next() are only valid until the next call to hasNext() or next().
+ */
+public interface RowIterator extends Iterator<Row>, AutoCloseable
+{
+    /**
+     * The metadata for the table this iterator on.
+     */
+    public CFMetaData metadata();
+
+    /**
+     * Whether or not the rows returned by this iterator are in reversed
+     * clustering order.
+     */
+    public boolean isReverseOrder();
+
+    /**
+     * A subset of the columns for the (static and regular) rows returned by this iterator.
+     * Every row returned by this iterator must guarantee that it has only those columns.
+     */
+    public PartitionColumns columns();
+
+    /**
+     * The partition key of the partition this in an iterator over.
+     */
+    public DecoratedKey partitionKey();
+
+    /**
+     * The static part corresponding to this partition (this can be an empty
+     * row).
+     */
+    public Row staticRow();
+
+    public void close();
+
+    /**
+     * Returns whether the provided iterator has no data.
+     */
+    public default boolean isEmpty()
+    {
+        return staticRow().isEmpty() && !hasNext();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/RowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowIterators.java b/src/java/org/apache/cassandra/db/rows/RowIterators.java
new file mode 100644
index 0000000..a3bd913
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/RowIterators.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.util.*;
+import java.security.MessageDigest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Static methods to work with row iterators.
+ */
+public abstract class RowIterators
+{
+    private static final Logger logger = LoggerFactory.getLogger(RowIterators.class);
+
+    private RowIterators() {}
+
+    public static PartitionUpdate toUpdate(RowIterator iterator)
+    {
+        PartitionUpdate update = new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), iterator.columns(), 1);
+
+        if (iterator.staticRow() != Rows.EMPTY_STATIC_ROW)
+            iterator.staticRow().copyTo(update.staticWriter());
+
+        while (iterator.hasNext())
+            iterator.next().copyTo(update.writer());
+
+        return update;
+    }
+
+    public static void digest(RowIterator iterator, MessageDigest digest)
+    {
+        // TODO: we're not computing digest the same way that old nodes so we'll need
+        // to pass the version we're computing the digest for and deal with that.
+        digest.update(iterator.partitionKey().getKey().duplicate());
+        iterator.columns().digest(digest);
+        FBUtilities.updateWithBoolean(digest, iterator.isReverseOrder());
+        iterator.staticRow().digest(digest);
+
+        while (iterator.hasNext())
+            iterator.next().digest(digest);
+    }
+
+    public static RowIterator emptyIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final boolean isReverseOrder)
+    {
+        return new RowIterator()
+        {
+            public CFMetaData metadata()
+            {
+                return cfm;
+            }
+
+            public boolean isReverseOrder()
+            {
+                return isReverseOrder;
+            }
+
+            public PartitionColumns columns()
+            {
+                return PartitionColumns.NONE;
+            }
+
+            public DecoratedKey partitionKey()
+            {
+                return partitionKey;
+            }
+
+            public Row staticRow()
+            {
+                return Rows.EMPTY_STATIC_ROW;
+            }
+
+            public boolean hasNext()
+            {
+                return false;
+            }
+
+            public Row next()
+            {
+                throw new NoSuchElementException();
+            }
+
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+
+            public void close()
+            {
+            }
+        };
+    }
+
+    /**
+     * Wraps the provided iterator so it logs the returned rows for debugging purposes.
+     * <p>
+     * Note that this is only meant for debugging as this can log a very large amount of
+     * logging at INFO.
+     */
+    public static RowIterator loggingIterator(RowIterator iterator, final String id)
+    {
+        CFMetaData metadata = iterator.metadata();
+        logger.info("[{}] Logging iterator on {}.{}, partition key={}, reversed={}",
+                    new Object[]{ id,
+                                  metadata.ksName,
+                                  metadata.cfName,
+                                  metadata.getKeyValidator().getString(iterator.partitionKey().getKey()),
+                                  iterator.isReverseOrder() });
+
+        return new WrappingRowIterator(iterator)
+        {
+            @Override
+            public Row staticRow()
+            {
+                Row row = super.staticRow();
+                if (!row.isEmpty())
+                    logger.info("[{}] {}", id, row.toString(metadata()));
+                return row;
+            }
+
+            @Override
+            public Row next()
+            {
+                Row next = super.next();
+                logger.info("[{}] {}", id, next.toString(metadata()));
+                return next;
+            }
+        };
+    }
+}