You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:43 UTC

[19/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/RowStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowStats.java b/src/java/org/apache/cassandra/db/rows/RowStats.java
new file mode 100644
index 0000000..1bffdbe
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/RowStats.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.Objects;
+
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+import static org.apache.cassandra.db.LivenessInfo.NO_TIMESTAMP;
+import static org.apache.cassandra.db.LivenessInfo.NO_TTL;
+import static org.apache.cassandra.db.LivenessInfo.NO_DELETION_TIME;
+
+/**
+ * General statistics on rows (and and tombstones) for a given source.
+ * <p>
+ * Those stats are used to optimize the on-wire and on-disk storage of rows. More precisely,
+ * the {@code minTimestamp}, {@code minLocalDeletionTime} and {@code minTTL} stats are used to
+ * delta-encode those information for the sake of vint encoding. And {@code avgColumnSetPerRow}
+ * is used to decide if cells should be stored in a sparse or dense way (see {@link UnfilteredSerializer}).
+ * <p>
+ * Note that due to their use, those stats can suffer to be somewhat inaccurate (the more incurrate
+ * they are, the less effective the storage will be, but provided the stats are not completly wacky,
+ * this shouldn't have too huge an impact on performance) and in fact they will not always be
+ * accurate for reasons explained in {@link SerializationHeader#make}.
+ */
+public class RowStats
+{
+    // We should use this sparingly obviously
+    public static final RowStats NO_STATS = new RowStats(NO_TIMESTAMP, NO_DELETION_TIME, NO_TTL, -1);
+
+    public static final Serializer serializer = new Serializer();
+
+    public final long minTimestamp;
+    public final int minLocalDeletionTime;
+    public final int minTTL;
+
+    // Will be < 0 if the value is unknown
+    public final int avgColumnSetPerRow;
+
+    public RowStats(long minTimestamp,
+                    int minLocalDeletionTime,
+                    int minTTL,
+                    int avgColumnSetPerRow)
+    {
+        this.minTimestamp = minTimestamp;
+        this.minLocalDeletionTime = minLocalDeletionTime;
+        this.minTTL = minTTL;
+        this.avgColumnSetPerRow = avgColumnSetPerRow;
+    }
+
+    public boolean hasMinTimestamp()
+    {
+        return minTimestamp != NO_TIMESTAMP;
+    }
+
+    public boolean hasMinLocalDeletionTime()
+    {
+        return minLocalDeletionTime != NO_DELETION_TIME;
+    }
+
+    /**
+     * Merge this stats with another one.
+     * <p>
+     * The comments of {@link SerializationHeader#make} applies here too, i.e. the result of
+     * merging will be not totally accurate but we can live with that.
+     */
+    public RowStats mergeWith(RowStats that)
+    {
+        long minTimestamp = this.minTimestamp == NO_TIMESTAMP
+                          ? that.minTimestamp
+                          : (that.minTimestamp == NO_TIMESTAMP ? this.minTimestamp : Math.min(this.minTimestamp, that.minTimestamp));
+
+        int minDelTime = this.minLocalDeletionTime == NO_DELETION_TIME
+                       ? that.minLocalDeletionTime
+                       : (that.minLocalDeletionTime == NO_DELETION_TIME ? this.minLocalDeletionTime : Math.min(this.minLocalDeletionTime, that.minLocalDeletionTime));
+
+        int minTTL = this.minTTL == NO_TTL
+                   ? that.minTTL
+                   : (that.minTTL == NO_TTL ? this.minTTL : Math.min(this.minTTL, that.minTTL));
+
+        int avgColumnSetPerRow = this.avgColumnSetPerRow < 0
+                               ? that.avgColumnSetPerRow
+                               : (that.avgColumnSetPerRow < 0 ? this.avgColumnSetPerRow : (this.avgColumnSetPerRow + that.avgColumnSetPerRow) / 2);
+
+        return new RowStats(minTimestamp, minDelTime, minTTL, avgColumnSetPerRow);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        RowStats rowStats = (RowStats) o;
+
+        if (avgColumnSetPerRow != rowStats.avgColumnSetPerRow) return false;
+        if (minLocalDeletionTime != rowStats.minLocalDeletionTime) return false;
+        if (minTTL != rowStats.minTTL) return false;
+        if (minTimestamp != rowStats.minTimestamp) return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("RowStats(ts=%d, ldt=%d, ttl=%d, avgColPerRow=%d)", minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
+    }
+
+    public static class Collector
+    {
+        private boolean isTimestampSet;
+        private long minTimestamp = Long.MAX_VALUE;
+
+        private boolean isDelTimeSet;
+        private int minDeletionTime = Integer.MAX_VALUE;
+
+        private boolean isTTLSet;
+        private int minTTL = Integer.MAX_VALUE;
+
+        private boolean isColumnSetPerRowSet;
+        private long totalColumnsSet;
+        private long rows;
+
+        public void updateTimestamp(long timestamp)
+        {
+            if (timestamp == NO_TIMESTAMP)
+                return;
+
+            isTimestampSet = true;
+            minTimestamp = Math.min(minTimestamp, timestamp);
+        }
+
+        public void updateLocalDeletionTime(int deletionTime)
+        {
+            if (deletionTime == NO_DELETION_TIME)
+                return;
+
+            isDelTimeSet = true;
+            minDeletionTime = Math.min(minDeletionTime, deletionTime);
+        }
+
+        public void updateDeletionTime(DeletionTime deletionTime)
+        {
+            if (deletionTime.isLive())
+                return;
+
+            updateTimestamp(deletionTime.markedForDeleteAt());
+            updateLocalDeletionTime(deletionTime.localDeletionTime());
+        }
+
+        public void updateTTL(int ttl)
+        {
+            if (ttl <= NO_TTL)
+                return;
+
+            isTTLSet = true;
+            minTTL = Math.min(minTTL, ttl);
+        }
+
+        public void updateColumnSetPerRow(int columnSetInRow)
+        {
+            updateColumnSetPerRow(columnSetInRow, 1);
+        }
+
+        public void updateColumnSetPerRow(long totalColumnsSet, long rows)
+        {
+            if (totalColumnsSet < 0 || rows < 0)
+                return;
+
+            this.isColumnSetPerRowSet = true;
+            this.totalColumnsSet += totalColumnsSet;
+            this.rows += rows;
+        }
+
+        public RowStats get()
+        {
+            return new RowStats(isTimestampSet ? minTimestamp : NO_TIMESTAMP,
+                                 isDelTimeSet ? minDeletionTime : NO_DELETION_TIME,
+                                 isTTLSet ? minTTL : NO_TTL,
+                                 isColumnSetPerRowSet ? (rows == 0 ? 0 : (int)(totalColumnsSet / rows)) : -1);
+        }
+    }
+
+    public static class Serializer
+    {
+        public void serialize(RowStats stats, DataOutputPlus out) throws IOException
+        {
+            out.writeLong(stats.minTimestamp);
+            out.writeInt(stats.minLocalDeletionTime);
+            out.writeInt(stats.minTTL);
+            out.writeInt(stats.avgColumnSetPerRow);
+        }
+
+        public int serializedSize(RowStats stats, TypeSizes sizes)
+        {
+            return sizes.sizeof(stats.minTimestamp)
+                 + sizes.sizeof(stats.minLocalDeletionTime)
+                 + sizes.sizeof(stats.minTTL)
+                 + sizes.sizeof(stats.avgColumnSetPerRow);
+        }
+
+        public RowStats deserialize(DataInput in) throws IOException
+        {
+            long minTimestamp = in.readLong();
+            int minLocalDeletionTime = in.readInt();
+            int minTTL = in.readInt();
+            int avgColumnSetPerRow = in.readInt();
+            return new RowStats(minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/Rows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java
new file mode 100644
index 0000000..76dcf60
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/Rows.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.util.*;
+
+import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.SearchIterator;
+
+/**
+ * Static utilities to work on Row objects.
+ */
+public abstract class Rows
+{
+    private static final Logger logger = LoggerFactory.getLogger(Rows.class);
+
+    private Rows() {}
+
+    public static final Row EMPTY_STATIC_ROW = new AbstractRow()
+    {
+        public Columns columns()
+        {
+            return Columns.NONE;
+        }
+
+        public LivenessInfo primaryKeyLivenessInfo()
+        {
+            return LivenessInfo.NONE;
+        }
+
+        public DeletionTime deletion()
+        {
+            return DeletionTime.LIVE;
+        }
+
+        public boolean isEmpty()
+        {
+            return true;
+        }
+
+        public boolean hasComplexDeletion()
+        {
+            return false;
+        }
+
+        public Clustering clustering()
+        {
+            return Clustering.STATIC_CLUSTERING;
+        }
+
+        public Cell getCell(ColumnDefinition c)
+        {
+            return null;
+        }
+
+        public Cell getCell(ColumnDefinition c, CellPath path)
+        {
+            return null;
+        }
+
+        public Iterator<Cell> getCells(ColumnDefinition c)
+        {
+            return null;
+        }
+
+        public DeletionTime getDeletion(ColumnDefinition c)
+        {
+            return DeletionTime.LIVE;
+        }
+
+        public Iterator<Cell> iterator()
+        {
+            return Iterators.<Cell>emptyIterator();
+        }
+
+        public SearchIterator<ColumnDefinition, ColumnData> searchIterator()
+        {
+            return new SearchIterator<ColumnDefinition, ColumnData>()
+            {
+                public boolean hasNext()
+                {
+                    return false;
+                }
+
+                public ColumnData next(ColumnDefinition column)
+                {
+                    return null;
+                }
+            };
+        }
+
+        public Kind kind()
+        {
+            return Unfiltered.Kind.ROW;
+        }
+
+        public Row takeAlias()
+        {
+            return this;
+        }
+    };
+
+    public interface SimpleMergeListener
+    {
+        public void onAdded(Cell newCell);
+        public void onRemoved(Cell removedCell);
+        public void onUpdated(Cell existingCell, Cell updatedCell);
+    }
+
+    public static void writeClustering(Clustering clustering, Row.Writer writer)
+    {
+        for (int i = 0; i < clustering.size(); i++)
+            writer.writeClusteringValue(clustering.get(i));
+    }
+
+    public static void merge(Row row1, Row row2, Columns mergedColumns, Row.Writer writer, int nowInSec)
+    {
+        merge(row1, row2, mergedColumns, writer, nowInSec, SecondaryIndexManager.nullUpdater);
+    }
+
+    // Merge rows in memtable
+    // Return the minimum timestamp delta between existing and update
+    public static long merge(Row existing,
+                             Row update,
+                             Columns mergedColumns,
+                             Row.Writer writer,
+                             int nowInSec,
+                             SecondaryIndexManager.Updater indexUpdater)
+    {
+        Clustering clustering = existing.clustering();
+        writeClustering(clustering, writer);
+
+        LivenessInfo existingInfo = existing.primaryKeyLivenessInfo();
+        LivenessInfo updateInfo = update.primaryKeyLivenessInfo();
+        LivenessInfo mergedInfo = existingInfo.mergeWith(updateInfo);
+
+        long timeDelta = Math.abs(existingInfo.timestamp() - mergedInfo.timestamp());
+
+        DeletionTime deletion = existing.deletion().supersedes(update.deletion()) ? existing.deletion() : update.deletion();
+
+        if (deletion.deletes(mergedInfo))
+            mergedInfo = LivenessInfo.NONE;
+
+        writer.writePartitionKeyLivenessInfo(mergedInfo);
+        writer.writeRowDeletion(deletion);
+
+        indexUpdater.maybeIndex(clustering, mergedInfo.timestamp(), mergedInfo.ttl(), deletion);
+
+        for (int i = 0; i < mergedColumns.simpleColumnCount(); i++)
+        {
+            ColumnDefinition c = mergedColumns.getSimple(i);
+            Cell existingCell = existing.getCell(c);
+            Cell updateCell = update.getCell(c);
+            timeDelta = Math.min(timeDelta, Cells.reconcile(clustering,
+                                                            existingCell,
+                                                            updateCell,
+                                                            deletion,
+                                                            writer,
+                                                            nowInSec,
+                                                            indexUpdater));
+        }
+
+        for (int i = 0; i < mergedColumns.complexColumnCount(); i++)
+        {
+            ColumnDefinition c = mergedColumns.getComplex(i);
+            DeletionTime existingDt = existing.getDeletion(c);
+            DeletionTime updateDt = update.getDeletion(c);
+            DeletionTime maxDt = existingDt.supersedes(updateDt) ? existingDt : updateDt;
+            if (maxDt.supersedes(deletion))
+                writer.writeComplexDeletion(c, maxDt);
+            else
+                maxDt = deletion;
+
+            Iterator<Cell> existingCells = existing.getCells(c);
+            Iterator<Cell> updateCells = update.getCells(c);
+            timeDelta = Math.min(timeDelta, Cells.reconcileComplex(clustering, c, existingCells, updateCells, maxDt, writer, nowInSec, indexUpdater));
+        }
+
+        writer.endOfRow();
+        return timeDelta;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
new file mode 100644
index 0000000..56b993c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class SerializationHelper
+{
+    /**
+     * Flag affecting deserialization behavior (this only affect counters in practice).
+     *  - LOCAL: for deserialization of local data (Expired columns are
+     *      converted to tombstones (to gain disk space)).
+     *  - FROM_REMOTE: for deserialization of data received from remote hosts
+     *      (Expired columns are converted to tombstone and counters have
+     *      their delta cleared)
+     *  - PRESERVE_SIZE: used when no transformation must be performed, i.e,
+     *      when we must ensure that deserializing and reserializing the
+     *      result yield the exact same bytes. Streaming uses this.
+     */
+    public static enum Flag
+    {
+        LOCAL, FROM_REMOTE, PRESERVE_SIZE;
+    }
+
+    private final Flag flag;
+    public final int version;
+
+    private final ReusableLivenessInfo livenessInfo = new ReusableLivenessInfo();
+
+    // The currently read row liveness infos (timestamp, ttl and localDeletionTime).
+    private long rowTimestamp;
+    private int rowTTL;
+    private int rowLocalDeletionTime;
+
+    private final ColumnFilter columnsToFetch;
+    private ColumnFilter.Tester tester;
+
+    public SerializationHelper(int version, Flag flag, ColumnFilter columnsToFetch)
+    {
+        this.flag = flag;
+        this.version = version;
+        this.columnsToFetch = columnsToFetch;
+    }
+
+    public SerializationHelper(int version, Flag flag)
+    {
+        this(version, flag, null);
+    }
+
+    public void writePartitionKeyLivenessInfo(Row.Writer writer, long timestamp, int ttl, int localDeletionTime)
+    {
+        livenessInfo.setTo(timestamp, ttl, localDeletionTime);
+        writer.writePartitionKeyLivenessInfo(livenessInfo);
+
+        rowTimestamp = timestamp;
+        rowTTL = ttl;
+        rowLocalDeletionTime = localDeletionTime;
+    }
+
+    public long getRowTimestamp()
+    {
+        return rowTimestamp;
+    }
+
+    public int getRowTTL()
+    {
+        return rowTTL;
+    }
+
+    public int getRowLocalDeletionTime()
+    {
+        return rowLocalDeletionTime;
+    }
+
+    public boolean includes(ColumnDefinition column)
+    {
+        return columnsToFetch == null || columnsToFetch.includes(column);
+    }
+
+    public boolean canSkipValue(ColumnDefinition column)
+    {
+        return columnsToFetch != null && columnsToFetch.canSkipValue(column);
+    }
+
+    public void startOfComplexColumn(ColumnDefinition column)
+    {
+        this.tester = columnsToFetch == null ? null : columnsToFetch.newTester(column);
+    }
+
+    public void endOfComplexColumn(ColumnDefinition column)
+    {
+        this.tester = null;
+    }
+
+    public void writeCell(Row.Writer writer,
+                          ColumnDefinition column,
+                          boolean isCounter,
+                          ByteBuffer value,
+                          long timestamp,
+                          int localDelTime,
+                          int ttl,
+                          CellPath path)
+    {
+        livenessInfo.setTo(timestamp, ttl, localDelTime);
+
+        if (isCounter && ((flag == Flag.FROM_REMOTE || (flag == Flag.LOCAL && CounterContext.instance().shouldClearLocal(value)))))
+            value = CounterContext.instance().clearAllLocal(value);
+
+        if (!column.isComplex() || tester == null || tester.includes(path))
+        {
+            if (tester != null && tester.canSkipValue(path))
+                value = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+            writer.writeCell(column, isCounter, value, livenessInfo, path);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/SimpleRowDataBlock.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/SimpleRowDataBlock.java b/src/java/org/apache/cassandra/db/rows/SimpleRowDataBlock.java
new file mode 100644
index 0000000..08f37fd
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/SimpleRowDataBlock.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.ObjectSizes;
+
+/**
+ * Holds cells data for the simple columns of one or more rows.
+ * <p>
+ * In practice, a {@code SimpleRowDataBlock} contains a single {@code CellData} "array" and
+ * the (simple) columns for which the {@code SimplerowDataBlock} has data for. The cell for
+ * a row i and a column c is stored in the {@code CellData} at index 'i * index(c)'.
+ * <p>
+ * This  does mean that we store cells in a "dense" way: if column doesn't have a cell for a
+ * given row, the correspond index in the cell data array will simple have a {@code null} value.
+ * We might want to switch to a more sparse encoding in the future but we keep it simple for
+ * now (having a sparse encoding make things a tad more complex because we need to be able to
+ * swap the cells for 2 given rows as seen in ComplexRowDataBlock).
+ */
+public class SimpleRowDataBlock
+{
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new SimpleRowDataBlock(Columns.NONE, 0, false));
+
+    final Columns columns;
+    final CellData data;
+
+    public SimpleRowDataBlock(Columns columns, int rows, boolean isCounter)
+    {
+        this.columns = columns;
+        this.data = new CellData(rows * columns.simpleColumnCount(), isCounter);
+    }
+
+    public Columns columns()
+    {
+        return columns;
+    }
+
+    // Swap row i and j
+    public void swap(int i, int j)
+    {
+        int s = columns.simpleColumnCount();
+        for (int k = 0; k < s; k++)
+            data.swapCell(i * s + k, j * s + k);
+    }
+
+    // Merge row i into j
+    public void merge(int i, int j, int nowInSec)
+    {
+        int s = columns.simpleColumnCount();
+        for (int k = 0; k < s; k++)
+            data.mergeCell(i * s + k, j * s + k, nowInSec);
+    }
+
+    // Move row i into j
+    public void move(int i, int j)
+    {
+        int s = columns.simpleColumnCount();
+        for (int k = 0; k < s; k++)
+            data.moveCell(i * s + k, j * s + k);
+    }
+
+    public long unsharedHeapSizeExcludingData()
+    {
+        return EMPTY_SIZE + data.unsharedHeapSizeExcludingData();
+    }
+
+    public int dataSize()
+    {
+        return data.dataSize();
+    }
+
+    public CellWriter cellWriter(boolean inOrderCells)
+    {
+        return new CellWriter(inOrderCells);
+    }
+
+    public static CellData.ReusableCell reusableCell()
+    {
+        return new CellData.ReusableCell();
+    }
+
+    public static ReusableIterator reusableIterator()
+    {
+        return new ReusableIterator();
+    }
+
+    public void clear()
+    {
+        data.clear();
+    }
+
+    static class ReusableIterator extends UnmodifiableIterator<Cell>
+    {
+        private SimpleRowDataBlock dataBlock;
+        private final CellData.ReusableCell cell = new CellData.ReusableCell();
+
+        private int base;
+        private int column;
+
+        private ReusableIterator()
+        {
+        }
+
+        public ReusableIterator setTo(SimpleRowDataBlock dataBlock, int row)
+        {
+            this.dataBlock = dataBlock;
+            this.base = dataBlock == null ? -1 : row * dataBlock.columns.simpleColumnCount();
+            this.column = 0;
+            return this;
+        }
+
+        public boolean hasNext()
+        {
+            if (dataBlock == null)
+                return false;
+
+            int columnCount = dataBlock.columns.simpleColumnCount();
+            // iterate over column until we find one with data
+            while (column < columnCount && !dataBlock.data.hasCell(base + column))
+                ++column;
+
+            return column < columnCount;
+        }
+
+        public Cell next()
+        {
+            cell.setTo(dataBlock.data, dataBlock.columns.getSimple(column), base + column);
+            ++column;
+            return cell;
+        }
+    }
+
+    public class CellWriter
+    {
+        private final boolean inOrderCells;
+
+        private int base;
+        private int lastColumnIdx;
+
+        public CellWriter(boolean inOrderCells)
+        {
+            this.inOrderCells = inOrderCells;
+        }
+
+        public void addCell(ColumnDefinition column, ByteBuffer value, LivenessInfo info)
+        {
+            int fromIdx = inOrderCells ? lastColumnIdx : 0;
+            lastColumnIdx = columns.simpleIdx(column, fromIdx);
+            assert lastColumnIdx >= 0 : "Cannot find column " + column.name + " in " + columns + " from " + fromIdx;
+            int idx = base + lastColumnIdx;
+            data.setCell(idx, value, info);
+        }
+
+        public void reset()
+        {
+            base = 0;
+            lastColumnIdx = 0;
+            data.clear();
+        }
+
+        public void endOfRow()
+        {
+            base += columns.simpleColumnCount();
+            lastColumnIdx = 0;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/SliceableUnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/SliceableUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/SliceableUnfilteredRowIterator.java
new file mode 100644
index 0000000..2250ee9
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/SliceableUnfilteredRowIterator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.util.Iterator;
+
+import org.apache.cassandra.db.Slice;
+
+public interface SliceableUnfilteredRowIterator extends UnfilteredRowIterator
+{
+    /**
+     * Move forward (resp. backward if isReverseOrder() is true for the iterator) in
+     * the iterator and return an iterator over the Unfiltered selected by the provided
+     * {@code slice}.
+     * <p>
+     * Please note that successive calls to {@code slice} are allowed provided the
+     * slice are non overlapping and are passed in clustering (resp. reverse clustering) order.
+     * However, {@code slice} is allowed to leave the iterator in an unknown state and there
+     * is no guarantee over what a call to {@code hasNext} or {@code next} will yield after
+     * a call to {@code slice}. In other words, for a given iterator, you should either use
+     * {@code slice} or {@code hasNext/next} but not both.
+     */
+    public Iterator<Unfiltered> slice(Slice slice);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/StaticRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/StaticRow.java b/src/java/org/apache/cassandra/db/rows/StaticRow.java
new file mode 100644
index 0000000..2ad9fb4
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/StaticRow.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import org.apache.cassandra.db.*;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.utils.SearchIterator;
+
+public class StaticRow extends AbstractRow
+{
+    private final DeletionTime deletion;
+    private final RowDataBlock data;
+
+    private StaticRow(DeletionTime deletion, RowDataBlock data)
+    {
+        this.deletion = deletion.takeAlias();
+        this.data = data;
+    }
+
+    public Columns columns()
+    {
+        return data.columns();
+    }
+
+    public Cell getCell(ColumnDefinition c)
+    {
+        assert !c.isComplex();
+        if (data.simpleData == null)
+            return null;
+
+        int idx = columns().simpleIdx(c, 0);
+        if (idx < 0)
+            return null;
+
+        return SimpleRowDataBlock.reusableCell().setTo(data.simpleData.data, c, idx);
+    }
+
+    public Cell getCell(ColumnDefinition c, CellPath path)
+    {
+        assert c.isComplex();
+
+        ComplexRowDataBlock dataBlock = data.complexData;
+        if (dataBlock == null)
+            return null;
+
+        int idx = dataBlock.cellIdx(0, c, path);
+        if (idx < 0)
+            return null;
+
+        return SimpleRowDataBlock.reusableCell().setTo(dataBlock.cellData(0), c, idx);
+    }
+
+    public Iterator<Cell> getCells(ColumnDefinition c)
+    {
+        assert c.isComplex();
+        return ComplexRowDataBlock.reusableComplexCells().setTo(data.complexData, 0, c);
+    }
+
+    public boolean hasComplexDeletion()
+    {
+        return data.hasComplexDeletion(0);
+    }
+
+    public DeletionTime getDeletion(ColumnDefinition c)
+    {
+        assert c.isComplex();
+        if (data.complexData == null)
+            return DeletionTime.LIVE;
+
+        int idx = data.complexData.complexDeletionIdx(0, c);
+        return idx < 0
+             ? DeletionTime.LIVE
+             : ComplexRowDataBlock.complexDeletionCursor().setTo(data.complexData.complexDelTimes, idx);
+    }
+
+    public Iterator<Cell> iterator()
+    {
+        return RowDataBlock.reusableIterator().setTo(data, 0);
+    }
+
+    public SearchIterator<ColumnDefinition, ColumnData> searchIterator()
+    {
+        return new SearchIterator<ColumnDefinition, ColumnData>()
+        {
+            private int simpleIdx = 0;
+
+            public boolean hasNext()
+            {
+                // TODO: we can do better, but we expect users to no rely on this anyway
+                return true;
+            }
+
+            public ColumnData next(ColumnDefinition column)
+            {
+                if (column.isComplex())
+                {
+                    // TODO: this is sub-optimal
+
+                    Iterator<Cell> cells = getCells(column);
+                    return cells == null ? null : new ColumnData(column, null, cells, getDeletion(column));
+                }
+                else
+                {
+                    simpleIdx = columns().simpleIdx(column, simpleIdx);
+                    assert simpleIdx >= 0;
+
+                    Cell cell = SimpleRowDataBlock.reusableCell().setTo(data.simpleData.data, column, simpleIdx);
+                    ++simpleIdx;
+                    return cell == null ? null : new ColumnData(column, cell, null, null);
+                }
+            }
+        };
+    }
+
+    public Row takeAlias()
+    {
+        return this;
+    }
+
+    public Clustering clustering()
+    {
+        return Clustering.STATIC_CLUSTERING;
+    }
+
+    public LivenessInfo primaryKeyLivenessInfo()
+    {
+        return LivenessInfo.NONE;
+    }
+
+    public DeletionTime deletion()
+    {
+        return deletion;
+    }
+
+    public static Builder builder(Columns columns, boolean inOrderCells, boolean isCounter)
+    {
+        return new Builder(columns, inOrderCells, isCounter);
+    }
+
+    public static class Builder extends RowDataBlock.Writer
+    {
+        private final RowDataBlock data;
+        private DeletionTime deletion = DeletionTime.LIVE;
+
+        public Builder(Columns columns, boolean inOrderCells, boolean isCounter)
+        {
+            super(inOrderCells);
+            this.data = new RowDataBlock(columns, 1, false, isCounter);
+            updateWriter(data);
+        }
+
+        public void writeClusteringValue(ByteBuffer buffer)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public void writePartitionKeyLivenessInfo(LivenessInfo info)
+        {
+            // Static rows are special and don't really have an existence unless they have live cells,
+            // so we shouldn't have any partition key liveness info.
+            assert info.equals(LivenessInfo.NONE);
+        }
+
+        public void writeRowDeletion(DeletionTime deletion)
+        {
+            this.deletion = deletion;
+        }
+
+        public StaticRow build()
+        {
+            return new StaticRow(deletion, data);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/TombstoneFilteringRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/TombstoneFilteringRow.java b/src/java/org/apache/cassandra/db/rows/TombstoneFilteringRow.java
new file mode 100644
index 0000000..a1c0ddc
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/TombstoneFilteringRow.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+
+public class TombstoneFilteringRow extends FilteringRow
+{
+    private final int nowInSec;
+
+    public TombstoneFilteringRow(int nowInSec)
+    {
+        this.nowInSec = nowInSec;
+    }
+
+    @Override
+    protected boolean include(DeletionTime dt)
+    {
+        return false;
+    }
+
+    @Override
+    protected boolean include(Cell cell)
+    {
+        return cell.isLive(nowInSec);
+    }
+
+    @Override
+    protected boolean include(ColumnDefinition c, DeletionTime dt)
+    {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/Unfiltered.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Unfiltered.java b/src/java/org/apache/cassandra/db/rows/Unfiltered.java
new file mode 100644
index 0000000..b1692e3
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/Unfiltered.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.security.MessageDigest;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.Clusterable;
+
+/**
+ * Unfiltered is the common class for the main constituent of an unfiltered partition.
+ * <p>
+ * In practice, an Unfiltered is either a row or a range tombstone marker. Unfiltereds
+ * are uniquely identified by their clustering information and can be sorted according
+ * to those.
+ */
+public interface Unfiltered extends Clusterable
+{
+    public enum Kind { ROW, RANGE_TOMBSTONE_MARKER };
+
+    /**
+     * The kind of the atom: either row or range tombstone marker.
+     */
+    public Kind kind();
+
+    /**
+     * Digest the atom using the provided {@code MessageDigest}.
+     *
+     * @param digest the {@code MessageDigest} to use.
+     */
+    public void digest(MessageDigest digest);
+
+    /**
+     * Validate the data of this atom.
+     *
+     * @param metadata the metadata for the table this atom is part of.
+     * @throws MarshalException if some of the data in this atom is
+     * invalid (some value is invalid for its column type, or some field
+     * is nonsensical).
+     */
+    public void validateData(CFMetaData metadata);
+
+    public String toString(CFMetaData metadata);
+    public String toString(CFMetaData metadata, boolean fullDetails);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java
new file mode 100644
index 0000000..a3ecf6d
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.util.Iterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+
+/**
+ * An iterator over the rows of a given partition that also includes deletion informations.
+ * <p>
+ * An {@code UnfilteredRowIterator} contains a few partition top-level informations and is an
+ * iterator of {@code Unfiltered}, that is of either {@code Row} or {@code RangeTombstoneMarker}.
+ * An implementation of {@code UnfilteredRowIterator} <b>must</b> provide the following
+ * guarantees:
+ *   1. the returned {@code Unfiltered} must be in clustering order, or in reverse clustering
+ *      order iff {@link #isReverseOrder} returns true.
+ *   2. the iterator should not shadow its own data. That is, no deletion
+ *      (partition level deletion, row deletion, range tombstone, complex
+ *      deletion) should delete anything else returned by the iterator (cell, row, ...).
+ *   3. every "start" range tombstone marker should have a corresponding "end" marker, and no other
+ *      marker should be in-between this start-end pair of marker. Note that due to the
+ *      previous rule this means that between a "start" and a corresponding "end" marker there
+ *      can only be rows that are not deleted by the markers. Also note that when iterating
+ *      in reverse order, "end" markers are returned before their "start" counterpart (i.e.
+ *      "start" and "end" are always in the sense of the clustering order).
+ *
+ * Note further that the objects returned by next() are only valid until the
+ * next call to hasNext() or next(). If a consumer wants to keep a reference on
+ * the returned objects for longer than the iteration, it must make a copy of
+ * it explicitly.
+ */
+public interface UnfilteredRowIterator extends Iterator<Unfiltered>, AutoCloseable
+{
+    /**
+     * The metadata for the table this iterator on.
+     */
+    public CFMetaData metadata();
+
+    /**
+     * A subset of the columns for the (static and regular) rows returned by this iterator.
+     * Every row returned by this iterator must guarantee that it has only those columns.
+     */
+    public PartitionColumns columns();
+
+    /**
+     * Whether or not the atom returned by this iterator are in reversed
+     * clustering order.
+     */
+    public boolean isReverseOrder();
+
+    /**
+     * The partition key of the partition this in an iterator over.
+     */
+    public DecoratedKey partitionKey();
+
+    /**
+     * The partition level deletion for the partition this iterate over.
+     */
+    public DeletionTime partitionLevelDeletion();
+
+    /**
+     * The static part corresponding to this partition (this can be an empty
+     * row).
+     */
+    public Row staticRow();
+
+    /**
+     * Return "statistics" about what is returned by this iterator. Those are used for
+     * performance reasons (for delta-encoding for instance) and code should not
+     * expect those to be exact.
+     */
+    public RowStats stats();
+
+    public void close();
+
+    /**
+     * Returns whether this iterator has no data (including no deletion data).
+     */
+    public default boolean isEmpty()
+    {
+        return partitionLevelDeletion().isLive()
+            && staticRow().isEmpty()
+            && !hasNext();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
new file mode 100644
index 0000000..13c09d4
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.IOError;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Serialize/Deserialize an unfiltered row iterator.
+ *
+ * The serialization is composed of a header, follows by the rows and range tombstones of the iterator serialized
+ * until we read the end of the partition (see UnfilteredSerializer for details). The header itself
+ * is:
+ *     <cfid><key><flags><s_header>[<partition_deletion>][<static_row>]
+ * where:
+ *     <cfid> is the table cfid.
+ *     <key> is the partition key.
+ *     <flags> contains bit flags. Each flag is set if it's corresponding bit is set. From rightmost
+ *         bit to leftmost one, the flags are:
+ *         - is empty: whether the iterator is empty. If so, nothing follows the <flags>
+ *         - is reversed: whether the iterator is in reversed clustering order
+ *         - has partition deletion: whether or not there is a <partition_deletion> following
+ *         - has static row: whether or not there is a <static_row> following
+ *         - has row estimate: whether or not there is a <row_estimate> following
+ *     <s_header> is the SerializationHeader. More precisely it's
+ *           <min_timetamp><min_localDelTime><min_ttl>[<static_columns>]<columns>
+ *         where:
+ *           - <min_timestamp> is the base timestamp used for delta-encoding timestamps
+ *           - <min_localDelTime> is the base localDeletionTime used for delta-encoding local deletion times
+ *           - <min_ttl> is the base localDeletionTime used for delta-encoding ttls
+ *           - <static_columns> is the static columns if a static row is present. It's
+ *             the number of columns as an unsigned short, followed by the column names.
+ *           - <columns> is the columns of the rows of the iterator. It's serialized as <static_columns>.
+ *     <partition_deletion> is the deletion time for the partition (delta-encoded)
+ *     <static_row> is the static row for this partition as serialized by UnfilteredSerializer.
+ *     <row_estimate> is the (potentially estimated) number of rows serialized. This is only use for
+ *         the purpose of some sizing on the receiving end and should not be relied upon too strongly.
+ *
+ * !!! Please note that the serialized value depends on the schema and as such should not be used as is if
+ *     it might be deserialized after the schema as changed !!!
+ * TODO: we should add a flag to include the relevant metadata in the header for commit log etc.....
+ */
+public class UnfilteredRowIteratorSerializer
+{
+    protected static final Logger logger = LoggerFactory.getLogger(UnfilteredRowIteratorSerializer.class);
+
+    private static final int IS_EMPTY               = 0x01;
+    private static final int IS_REVERSED            = 0x02;
+    private static final int HAS_PARTITION_DELETION = 0x04;
+    private static final int HAS_STATIC_ROW         = 0x08;
+    private static final int HAS_ROW_ESTIMATE       = 0x10;
+
+    public static final UnfilteredRowIteratorSerializer serializer = new UnfilteredRowIteratorSerializer();
+
+    public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, int version) throws IOException
+    {
+        serialize(iterator, out, version, -1);
+    }
+
+    public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, int version, int rowEstimate) throws IOException
+    {
+        SerializationHeader header = new SerializationHeader(iterator.metadata(),
+                                                             iterator.columns(),
+                                                             iterator.stats());
+        serialize(iterator, out, header, version, rowEstimate);
+    }
+
+    public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, SerializationHeader header, int version, int rowEstimate) throws IOException
+    {
+        CFMetaData.serializer.serialize(iterator.metadata(), out, version);
+        ByteBufferUtil.writeWithLength(iterator.partitionKey().getKey(), out);
+
+        int flags = 0;
+        if (iterator.isReverseOrder())
+            flags |= IS_REVERSED;
+
+        if (iterator.isEmpty())
+        {
+            out.writeByte((byte)(flags | IS_EMPTY));
+            return;
+        }
+
+        DeletionTime partitionDeletion = iterator.partitionLevelDeletion();
+        if (!partitionDeletion.isLive())
+            flags |= HAS_PARTITION_DELETION;
+        Row staticRow = iterator.staticRow();
+        boolean hasStatic = staticRow != Rows.EMPTY_STATIC_ROW;
+        if (hasStatic)
+            flags |= HAS_STATIC_ROW;
+
+        if (rowEstimate >= 0)
+            flags |= HAS_ROW_ESTIMATE;
+
+        out.writeByte((byte)flags);
+
+        SerializationHeader.serializer.serializeForMessaging(header, out, hasStatic);
+
+        if (!partitionDeletion.isLive())
+            writeDelTime(partitionDeletion, header, out);
+
+        if (hasStatic)
+            UnfilteredSerializer.serializer.serialize(staticRow, header, out, version);
+
+        if (rowEstimate >= 0)
+            out.writeInt(rowEstimate);
+
+        while (iterator.hasNext())
+            UnfilteredSerializer.serializer.serialize(iterator.next(), header, out, version);
+        UnfilteredSerializer.serializer.writeEndOfPartition(out);
+    }
+
+    // Please note that this consume the iterator, and as such should not be called unless we have a simple way to
+    // recreate an iterator for both serialize and serializedSize, which is mostly only PartitionUpdate
+    public long serializedSize(UnfilteredRowIterator iterator, int version, int rowEstimate, TypeSizes sizes)
+    {
+        SerializationHeader header = new SerializationHeader(iterator.metadata(),
+                                                             iterator.columns(),
+                                                             iterator.stats());
+
+        assert rowEstimate >= 0;
+
+        long size = CFMetaData.serializer.serializedSize(iterator.metadata(), version, sizes)
+                  + sizes.sizeofWithLength(iterator.partitionKey().getKey())
+                  + 1; // flags
+
+        if (iterator.isEmpty())
+            return size;
+
+        DeletionTime partitionDeletion = iterator.partitionLevelDeletion();
+        Row staticRow = iterator.staticRow();
+        boolean hasStatic = staticRow != Rows.EMPTY_STATIC_ROW;
+
+        size += SerializationHeader.serializer.serializedSizeForMessaging(header, sizes, hasStatic);
+
+        if (!partitionDeletion.isLive())
+            size += delTimeSerializedSize(partitionDeletion, header, sizes);
+
+        if (hasStatic)
+            size += UnfilteredSerializer.serializer.serializedSize(staticRow, header, version, sizes);
+
+        if (rowEstimate >= 0)
+            size += sizes.sizeof(rowEstimate);
+
+        while (iterator.hasNext())
+            size += UnfilteredSerializer.serializer.serializedSize(iterator.next(), header, version, sizes);
+        size += UnfilteredSerializer.serializer.serializedSizeEndOfPartition(sizes);
+
+        return size;
+    }
+
+    public Header deserializeHeader(DataInput in, int version, SerializationHelper.Flag flag) throws IOException
+    {
+        CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
+        DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithLength(in));
+        int flags = in.readUnsignedByte();
+        boolean isReversed = (flags & IS_REVERSED) != 0;
+        if ((flags & IS_EMPTY) != 0)
+        {
+            SerializationHeader sh = new SerializationHeader(metadata, PartitionColumns.NONE, RowStats.NO_STATS);
+            return new Header(sh, metadata, key, isReversed, true, null, null, 0);
+        }
+
+        boolean hasPartitionDeletion = (flags & HAS_PARTITION_DELETION) != 0;
+        boolean hasStatic = (flags & HAS_STATIC_ROW) != 0;
+        boolean hasRowEstimate = (flags & HAS_ROW_ESTIMATE) != 0;
+
+        SerializationHeader header = SerializationHeader.serializer.deserializeForMessaging(in, metadata, hasStatic);
+
+        DeletionTime partitionDeletion = hasPartitionDeletion ? readDelTime(in, header) : DeletionTime.LIVE;
+
+        Row staticRow = Rows.EMPTY_STATIC_ROW;
+        if (hasStatic)
+            staticRow = UnfilteredSerializer.serializer.deserializeStaticRow(in, header, new SerializationHelper(version, flag));
+
+        int rowEstimate = hasRowEstimate ? in.readInt() : -1;
+        return new Header(header, metadata, key, isReversed, false, partitionDeletion, staticRow, rowEstimate);
+    }
+
+    public void deserialize(DataInput in, SerializationHelper helper, SerializationHeader header, Row.Writer rowWriter, RangeTombstoneMarker.Writer markerWriter) throws IOException
+    {
+        while (UnfilteredSerializer.serializer.deserialize(in, header, helper, rowWriter, markerWriter) != null);
+    }
+
+    public UnfilteredRowIterator deserialize(final DataInput in, int version, SerializationHelper.Flag flag) throws IOException
+    {
+        final Header h = deserializeHeader(in, version, flag);
+
+        if (h.isEmpty)
+            return UnfilteredRowIterators.emptyIterator(h.metadata, h.key, h.isReversed);
+
+        final int clusteringSize = h.metadata.clusteringColumns().size();
+        final SerializationHelper helper = new SerializationHelper(version, flag);
+
+        return new AbstractUnfilteredRowIterator(h.metadata, h.key, h.partitionDeletion, h.sHeader.columns(), h.staticRow, h.isReversed, h.sHeader.stats())
+        {
+            private final ReusableRow row = new ReusableRow(clusteringSize, h.sHeader.columns().regulars, true, h.metadata.isCounter());
+            private final RangeTombstoneMarker.Builder markerBuilder = new RangeTombstoneMarker.Builder(clusteringSize);
+
+            protected Unfiltered computeNext()
+            {
+                try
+                {
+                    Unfiltered.Kind kind = UnfilteredSerializer.serializer.deserialize(in, h.sHeader, helper, row.writer(), markerBuilder.reset());
+                    if (kind == null)
+                        return endOfData();
+
+                    return kind == Unfiltered.Kind.ROW ? row : markerBuilder.build();
+                }
+                catch (IOException e)
+                {
+                    throw new IOError(e);
+                }
+            }
+        };
+    }
+
+    public static void writeDelTime(DeletionTime dt, SerializationHeader header, DataOutputPlus out) throws IOException
+    {
+        out.writeLong(header.encodeTimestamp(dt.markedForDeleteAt()));
+        out.writeInt(header.encodeDeletionTime(dt.localDeletionTime()));
+    }
+
+    public static long delTimeSerializedSize(DeletionTime dt, SerializationHeader header, TypeSizes sizes)
+    {
+        return sizes.sizeof(header.encodeTimestamp(dt.markedForDeleteAt()))
+             + sizes.sizeof(header.encodeDeletionTime(dt.localDeletionTime()));
+    }
+
+    public static DeletionTime readDelTime(DataInput in, SerializationHeader header) throws IOException
+    {
+        long markedAt = header.decodeTimestamp(in.readLong());
+        int localDelTime = header.decodeDeletionTime(in.readInt());
+        return new SimpleDeletionTime(markedAt, localDelTime);
+    }
+
+    public static void skipDelTime(DataInput in, SerializationHeader header) throws IOException
+    {
+        // Note that since we might use VINT, we shouldn't assume the size of a long or an int
+        in.readLong();
+        in.readInt();
+    }
+
+    public static class Header
+    {
+        public final SerializationHeader sHeader;
+        public final CFMetaData metadata;
+        public final DecoratedKey key;
+        public final boolean isReversed;
+        public final boolean isEmpty;
+        public final DeletionTime partitionDeletion;
+        public final Row staticRow;
+        public final int rowEstimate; // -1 if no estimate
+
+        private Header(SerializationHeader sHeader,
+                       CFMetaData metadata,
+                       DecoratedKey key,
+                       boolean isReversed,
+                       boolean isEmpty,
+                       DeletionTime partitionDeletion,
+                       Row staticRow,
+                       int rowEstimate)
+        {
+            this.sHeader = sHeader;
+            this.metadata = metadata;
+            this.key = key;
+            this.isReversed = isReversed;
+            this.isEmpty = isEmpty;
+            this.partitionDeletion = partitionDeletion;
+            this.staticRow = staticRow;
+            this.rowEstimate = rowEstimate;
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("{header=%s, table=%s.%s, key=%s, isReversed=%b, isEmpty=%b, del=%s, staticRow=%s, rowEstimate=%d}",
+                                 sHeader, metadata.ksName, metadata.cfName, key, isReversed, isEmpty, partitionDeletion, staticRow.toString(metadata), rowEstimate);
+        }
+    }
+}