You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/07/22 18:05:34 UTC

[05/15] cassandra git commit: Simplify some 8099's implementations

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/Rows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java
index 76dcf60..122f7d3 100644
--- a/src/java/org/apache/cassandra/db/rows/Rows.java
+++ b/src/java/org/apache/cassandra/db/rows/Rows.java
@@ -20,14 +20,12 @@ package org.apache.cassandra.db.rows;
 import java.util.*;
 
 import com.google.common.collect.Iterators;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.collect.PeekingIterator;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.SearchIterator;
 
 /**
@@ -35,110 +33,198 @@ import org.apache.cassandra.utils.SearchIterator;
  */
 public abstract class Rows
 {
-    private static final Logger logger = LoggerFactory.getLogger(Rows.class);
-
-    private Rows() {}
-
-    public static final Row EMPTY_STATIC_ROW = new AbstractRow()
+    // TODO: we could have a that in a more generic place...
+    private static final SearchIterator<ColumnDefinition, ColumnData> EMPTY_SEARCH_ITERATOR = new SearchIterator<ColumnDefinition, ColumnData>()
     {
-        public Columns columns()
+        public boolean hasNext()
         {
-            return Columns.NONE;
+            return false;
         }
 
-        public LivenessInfo primaryKeyLivenessInfo()
+        public ColumnData next(ColumnDefinition column)
         {
-            return LivenessInfo.NONE;
+            return null;
         }
+    };
 
-        public DeletionTime deletion()
-        {
-            return DeletionTime.LIVE;
-        }
+    private Rows() {}
 
-        public boolean isEmpty()
-        {
-            return true;
-        }
+    public static final Row EMPTY_STATIC_ROW = ArrayBackedRow.emptyRow(Clustering.STATIC_CLUSTERING);
 
-        public boolean hasComplexDeletion()
+    public static Row.Builder copy(Row row, Row.Builder builder)
+    {
+        builder.newRow(row.clustering());
+        builder.addPrimaryKeyLivenessInfo(row.primaryKeyLivenessInfo());
+        builder.addRowDeletion(row.deletion());
+        for (ColumnData cd : row)
         {
-            return false;
+            if (cd.column().isSimple())
+            {
+                builder.addCell((Cell)cd);
+            }
+            else
+            {
+                ComplexColumnData complexData = (ComplexColumnData)cd;
+                builder.addComplexDeletion(complexData.column(), complexData.complexDeletion());
+                for (Cell cell : complexData)
+                    builder.addCell(cell);
+            }
         }
+        return builder;
+    }
 
-        public Clustering clustering()
-        {
-            return Clustering.STATIC_CLUSTERING;
-        }
+    /**
+     * Collect statistics ont a given row.
+     *
+     * @param row the row for which to collect stats.
+     * @param collector the stats collector.
+     * @return the total number of cells in {@code row}.
+     */
+    public static int collectStats(Row row, PartitionStatisticsCollector collector)
+    {
+        assert !row.isEmpty();
 
-        public Cell getCell(ColumnDefinition c)
-        {
-            return null;
-        }
+        collector.update(row.primaryKeyLivenessInfo());
+        collector.update(row.deletion());
 
-        public Cell getCell(ColumnDefinition c, CellPath path)
+        int columnCount = 0;
+        int cellCount = 0;
+        for (ColumnData cd : row)
         {
-            return null;
-        }
+            if (cd.column().isSimple())
+            {
+                ++columnCount;
+                ++cellCount;
+                Cells.collectStats((Cell)cd, collector);
+            }
+            else
+            {
+                ComplexColumnData complexData = (ComplexColumnData)cd;
+                collector.update(complexData.complexDeletion());
+                if (complexData.hasCells())
+                {
+                    ++columnCount;
+                    for (Cell cell : complexData)
+                    {
+                        ++cellCount;
+                        Cells.collectStats(cell, collector);
+                    }
+                }
+            }
 
-        public Iterator<Cell> getCells(ColumnDefinition c)
-        {
-            return null;
         }
+        collector.updateColumnSetPerRow(columnCount);
+        return cellCount;
+    }
 
-        public DeletionTime getDeletion(ColumnDefinition c)
+    /**
+     * Given the result ({@code merged}) of merging multiple {@code inputs}, signals the difference between
+     * each input and {@code merged} to {@code diffListener}.
+     *
+     * @param merged the result of merging {@code inputs}.
+     * @param columns a superset of all the columns in any of {@code merged}/{@code inputs}.
+     * @param inputs the inputs whose merge yielded {@code merged}.
+     * @param diffListener the listener to which to signal the differences between the inputs and the merged
+     * result.
+     */
+    public static void diff(Row merged, Columns columns, Row[] inputs, RowDiffListener diffListener)
+    {
+        Clustering clustering = merged.clustering();
+        LivenessInfo mergedInfo = merged.primaryKeyLivenessInfo().isEmpty() ? null : merged.primaryKeyLivenessInfo();
+        DeletionTime mergedDeletion = merged.deletion().isLive() ? null : merged.deletion();
+        for (int i = 0; i < inputs.length; i++)
         {
-            return DeletionTime.LIVE;
+            Row input = inputs[i];
+            LivenessInfo inputInfo = input == null || input.primaryKeyLivenessInfo().isEmpty() ? null : input.primaryKeyLivenessInfo();
+            DeletionTime inputDeletion = input == null || input.deletion().isLive() ? null : input.deletion();
+
+            if (mergedInfo != null || inputInfo != null)
+                diffListener.onPrimaryKeyLivenessInfo(i, clustering, mergedInfo, inputInfo);
+            if (mergedDeletion != null || inputDeletion != null)
+                diffListener.onDeletion(i, clustering, mergedDeletion, inputDeletion);
         }
 
-        public Iterator<Cell> iterator()
+        SearchIterator<ColumnDefinition, ColumnData> mergedIterator = merged.searchIterator();
+        List<SearchIterator<ColumnDefinition, ColumnData>> inputIterators = new ArrayList<>(inputs.length);
+
+        for (Row row : inputs)
+            inputIterators.add(row == null ? EMPTY_SEARCH_ITERATOR : row.searchIterator());
+
+        Iterator<ColumnDefinition> simpleColumns = columns.simpleColumns();
+        while (simpleColumns.hasNext())
         {
-            return Iterators.<Cell>emptyIterator();
+            ColumnDefinition column = simpleColumns.next();
+            Cell mergedCell = (Cell)mergedIterator.next(column);
+            for (int i = 0; i < inputs.length; i++)
+            {
+                Cell inputCell = (Cell)inputIterators.get(i).next(column);
+                if (mergedCell != null || inputCell != null)
+                    diffListener.onCell(i, clustering, mergedCell, inputCell);
+            }
         }
 
-        public SearchIterator<ColumnDefinition, ColumnData> searchIterator()
+        Iterator<ColumnDefinition> complexColumns = columns.complexColumns();
+        while (complexColumns.hasNext())
         {
-            return new SearchIterator<ColumnDefinition, ColumnData>()
+            ColumnDefinition column = complexColumns.next();
+            ComplexColumnData mergedData = (ComplexColumnData)mergedIterator.next(column);
+            // Doing one input at a time is not the most efficient, but it's a lot simpler for now
+            for (int i = 0; i < inputs.length; i++)
             {
-                public boolean hasNext()
+                ComplexColumnData inputData = (ComplexColumnData)inputIterators.get(i).next(column);
+                if (mergedData == null)
                 {
-                    return false;
+                    if (inputData == null)
+                        continue;
+
+                    // Everything in inputData has been shadowed
+                    if (!inputData.complexDeletion().isLive())
+                        diffListener.onComplexDeletion(i, clustering, column, null, inputData.complexDeletion());
+                    for (Cell inputCell : inputData)
+                        diffListener.onCell(i, clustering, null, inputCell);
                 }
-
-                public ColumnData next(ColumnDefinition column)
+                else if (inputData == null)
                 {
-                    return null;
+                    // Everything in inputData is new
+                    if (!mergedData.complexDeletion().isLive())
+                        diffListener.onComplexDeletion(i, clustering, column, mergedData.complexDeletion(), null);
+                    for (Cell mergedCell : mergedData)
+                        diffListener.onCell(i, clustering, mergedCell, null);
                 }
-            };
-        }
-
-        public Kind kind()
-        {
-            return Unfiltered.Kind.ROW;
-        }
-
-        public Row takeAlias()
-        {
-            return this;
+                else
+                {
+                    PeekingIterator<Cell> mergedCells = Iterators.peekingIterator(mergedData.iterator());
+                    PeekingIterator<Cell> inputCells = Iterators.peekingIterator(inputData.iterator());
+                    while (mergedCells.hasNext() && inputCells.hasNext())
+                    {
+                        int cmp = column.cellPathComparator().compare(mergedCells.peek().path(), inputCells.peek().path());
+                        if (cmp == 0)
+                            diffListener.onCell(i, clustering, mergedCells.next(), inputCells.next());
+                        else if (cmp < 0)
+                            diffListener.onCell(i, clustering, mergedCells.next(), null);
+                        else // cmp > 0
+                            diffListener.onCell(i, clustering, null, inputCells.next());
+                    }
+                    while (mergedCells.hasNext())
+                        diffListener.onCell(i, clustering, mergedCells.next(), null);
+                    while (inputCells.hasNext())
+                        diffListener.onCell(i, clustering, null, inputCells.next());
+                }
+            }
         }
-    };
-
-    public interface SimpleMergeListener
-    {
-        public void onAdded(Cell newCell);
-        public void onRemoved(Cell removedCell);
-        public void onUpdated(Cell existingCell, Cell updatedCell);
     }
 
-    public static void writeClustering(Clustering clustering, Row.Writer writer)
+    public static Row merge(Row row1, Row row2, int nowInSec)
     {
-        for (int i = 0; i < clustering.size(); i++)
-            writer.writeClusteringValue(clustering.get(i));
+        Columns mergedColumns = row1.columns().mergeTo(row2.columns());
+        Row.Builder builder = ArrayBackedRow.sortedBuilder(mergedColumns);
+        merge(row1, row2, mergedColumns, builder, nowInSec, SecondaryIndexManager.nullUpdater);
+        return builder.build();
     }
 
-    public static void merge(Row row1, Row row2, Columns mergedColumns, Row.Writer writer, int nowInSec)
+    public static void merge(Row row1, Row row2, Columns mergedColumns, Row.Builder builder, int nowInSec)
     {
-        merge(row1, row2, mergedColumns, writer, nowInSec, SecondaryIndexManager.nullUpdater);
+        merge(row1, row2, mergedColumns, builder, nowInSec, SecondaryIndexManager.nullUpdater);
     }
 
     // Merge rows in memtable
@@ -146,26 +232,26 @@ public abstract class Rows
     public static long merge(Row existing,
                              Row update,
                              Columns mergedColumns,
-                             Row.Writer writer,
+                             Row.Builder builder,
                              int nowInSec,
                              SecondaryIndexManager.Updater indexUpdater)
     {
         Clustering clustering = existing.clustering();
-        writeClustering(clustering, writer);
+        builder.newRow(clustering);
 
         LivenessInfo existingInfo = existing.primaryKeyLivenessInfo();
         LivenessInfo updateInfo = update.primaryKeyLivenessInfo();
-        LivenessInfo mergedInfo = existingInfo.mergeWith(updateInfo);
+        LivenessInfo mergedInfo = existingInfo.supersedes(updateInfo) ? existingInfo : updateInfo;
 
         long timeDelta = Math.abs(existingInfo.timestamp() - mergedInfo.timestamp());
 
         DeletionTime deletion = existing.deletion().supersedes(update.deletion()) ? existing.deletion() : update.deletion();
 
         if (deletion.deletes(mergedInfo))
-            mergedInfo = LivenessInfo.NONE;
+            mergedInfo = LivenessInfo.EMPTY;
 
-        writer.writePartitionKeyLivenessInfo(mergedInfo);
-        writer.writeRowDeletion(deletion);
+        builder.addPrimaryKeyLivenessInfo(mergedInfo);
+        builder.addRowDeletion(deletion);
 
         indexUpdater.maybeIndex(clustering, mergedInfo.timestamp(), mergedInfo.ttl(), deletion);
 
@@ -178,7 +264,7 @@ public abstract class Rows
                                                             existingCell,
                                                             updateCell,
                                                             deletion,
-                                                            writer,
+                                                            builder,
                                                             nowInSec,
                                                             indexUpdater));
         }
@@ -186,20 +272,22 @@ public abstract class Rows
         for (int i = 0; i < mergedColumns.complexColumnCount(); i++)
         {
             ColumnDefinition c = mergedColumns.getComplex(i);
-            DeletionTime existingDt = existing.getDeletion(c);
-            DeletionTime updateDt = update.getDeletion(c);
+            ComplexColumnData existingData = existing.getComplexColumnData(c);
+            ComplexColumnData updateData = update.getComplexColumnData(c);
+
+            DeletionTime existingDt = existingData == null ? DeletionTime.LIVE : existingData.complexDeletion();
+            DeletionTime updateDt = updateData == null ? DeletionTime.LIVE : updateData.complexDeletion();
             DeletionTime maxDt = existingDt.supersedes(updateDt) ? existingDt : updateDt;
             if (maxDt.supersedes(deletion))
-                writer.writeComplexDeletion(c, maxDt);
+                builder.addComplexDeletion(c, maxDt);
             else
                 maxDt = deletion;
 
-            Iterator<Cell> existingCells = existing.getCells(c);
-            Iterator<Cell> updateCells = update.getCells(c);
-            timeDelta = Math.min(timeDelta, Cells.reconcileComplex(clustering, c, existingCells, updateCells, maxDt, writer, nowInSec, indexUpdater));
+            Iterator<Cell> existingCells = existingData == null ? null : existingData.iterator();
+            Iterator<Cell> updateCells = updateData == null ? null : updateData.iterator();
+            timeDelta = Math.min(timeDelta, Cells.reconcileComplex(clustering, c, existingCells, updateCells, maxDt, builder, nowInSec, indexUpdater));
         }
 
-        writer.endOfRow();
         return timeDelta;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
index 56b993c..6b4bc2e 100644
--- a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
+++ b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
@@ -18,12 +18,13 @@
 package org.apache.cassandra.db.rows;
 
 import java.nio.ByteBuffer;
+import java.util.*;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class SerializationHelper
 {
@@ -38,100 +39,90 @@ public class SerializationHelper
      *      when we must ensure that deserializing and reserializing the
      *      result yield the exact same bytes. Streaming uses this.
      */
-    public static enum Flag
+    public enum Flag
     {
-        LOCAL, FROM_REMOTE, PRESERVE_SIZE;
+        LOCAL, FROM_REMOTE, PRESERVE_SIZE
     }
 
     private final Flag flag;
     public final int version;
 
-    private final ReusableLivenessInfo livenessInfo = new ReusableLivenessInfo();
-
-    // The currently read row liveness infos (timestamp, ttl and localDeletionTime).
-    private long rowTimestamp;
-    private int rowTTL;
-    private int rowLocalDeletionTime;
-
     private final ColumnFilter columnsToFetch;
     private ColumnFilter.Tester tester;
 
-    public SerializationHelper(int version, Flag flag, ColumnFilter columnsToFetch)
+    private final Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns;
+    private CFMetaData.DroppedColumn currentDroppedComplex;
+
+
+    public SerializationHelper(CFMetaData metadata, int version, Flag flag, ColumnFilter columnsToFetch)
     {
         this.flag = flag;
         this.version = version;
         this.columnsToFetch = columnsToFetch;
+        this.droppedColumns = metadata.getDroppedColumns();
     }
 
-    public SerializationHelper(int version, Flag flag)
+    public SerializationHelper(CFMetaData metadata, int version, Flag flag)
     {
-        this(version, flag, null);
+        this(metadata, version, flag, null);
     }
 
-    public void writePartitionKeyLivenessInfo(Row.Writer writer, long timestamp, int ttl, int localDeletionTime)
+    public Columns fetchedStaticColumns(SerializationHeader header)
     {
-        livenessInfo.setTo(timestamp, ttl, localDeletionTime);
-        writer.writePartitionKeyLivenessInfo(livenessInfo);
-
-        rowTimestamp = timestamp;
-        rowTTL = ttl;
-        rowLocalDeletionTime = localDeletionTime;
+        return columnsToFetch == null ? header.columns().statics : columnsToFetch.fetchedColumns().statics;
     }
 
-    public long getRowTimestamp()
+    public Columns fetchedRegularColumns(SerializationHeader header)
     {
-        return rowTimestamp;
+        return columnsToFetch == null ? header.columns().regulars : columnsToFetch.fetchedColumns().regulars;
     }
 
-    public int getRowTTL()
+    public boolean includes(ColumnDefinition column)
     {
-        return rowTTL;
+        return columnsToFetch == null || columnsToFetch.includes(column);
     }
 
-    public int getRowLocalDeletionTime()
+    public boolean includes(CellPath path)
     {
-        return rowLocalDeletionTime;
+        return path == null || tester == null || tester.includes(path);
     }
 
-    public boolean includes(ColumnDefinition column)
+    public boolean canSkipValue(ColumnDefinition column)
     {
-        return columnsToFetch == null || columnsToFetch.includes(column);
+        return columnsToFetch != null && columnsToFetch.canSkipValue(column);
     }
 
-    public boolean canSkipValue(ColumnDefinition column)
+    public boolean canSkipValue(CellPath path)
     {
-        return columnsToFetch != null && columnsToFetch.canSkipValue(column);
+        return path != null && tester != null && tester.canSkipValue(path);
     }
 
     public void startOfComplexColumn(ColumnDefinition column)
     {
         this.tester = columnsToFetch == null ? null : columnsToFetch.newTester(column);
+        this.currentDroppedComplex = droppedColumns.get(column.name.bytes);
     }
 
-    public void endOfComplexColumn(ColumnDefinition column)
+    public void endOfComplexColumn()
     {
         this.tester = null;
     }
 
-    public void writeCell(Row.Writer writer,
-                          ColumnDefinition column,
-                          boolean isCounter,
-                          ByteBuffer value,
-                          long timestamp,
-                          int localDelTime,
-                          int ttl,
-                          CellPath path)
+    public boolean isDropped(Cell cell, boolean isComplex)
     {
-        livenessInfo.setTo(timestamp, ttl, localDelTime);
+        CFMetaData.DroppedColumn dropped = isComplex ? currentDroppedComplex : droppedColumns.get(cell.column().name.bytes);
+        return dropped != null && cell.timestamp() <= dropped.droppedTime;
+    }
 
-        if (isCounter && ((flag == Flag.FROM_REMOTE || (flag == Flag.LOCAL && CounterContext.instance().shouldClearLocal(value)))))
-            value = CounterContext.instance().clearAllLocal(value);
+    public boolean isDroppedComplexDeletion(DeletionTime complexDeletion)
+    {
+        return currentDroppedComplex != null && complexDeletion.markedForDeleteAt() <= currentDroppedComplex.droppedTime;
+    }
 
-        if (!column.isComplex() || tester == null || tester.includes(path))
-        {
-            if (tester != null && tester.canSkipValue(path))
-                value = ByteBufferUtil.EMPTY_BYTE_BUFFER;
-            writer.writeCell(column, isCounter, value, livenessInfo, path);
-        }
+    public ByteBuffer maybeClearCounterValue(ByteBuffer value)
+    {
+        return flag == Flag.FROM_REMOTE || (flag == Flag.LOCAL && CounterContext.instance().shouldClearLocal(value))
+             ? CounterContext.instance().clearAllLocal(value)
+             : value;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/SimpleRowDataBlock.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/SimpleRowDataBlock.java b/src/java/org/apache/cassandra/db/rows/SimpleRowDataBlock.java
deleted file mode 100644
index 08f37fd..0000000
--- a/src/java/org/apache/cassandra/db/rows/SimpleRowDataBlock.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import java.nio.ByteBuffer;
-
-import com.google.common.collect.UnmodifiableIterator;
-
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.utils.ObjectSizes;
-
-/**
- * Holds cells data for the simple columns of one or more rows.
- * <p>
- * In practice, a {@code SimpleRowDataBlock} contains a single {@code CellData} "array" and
- * the (simple) columns for which the {@code SimplerowDataBlock} has data for. The cell for
- * a row i and a column c is stored in the {@code CellData} at index 'i * index(c)'.
- * <p>
- * This  does mean that we store cells in a "dense" way: if column doesn't have a cell for a
- * given row, the correspond index in the cell data array will simple have a {@code null} value.
- * We might want to switch to a more sparse encoding in the future but we keep it simple for
- * now (having a sparse encoding make things a tad more complex because we need to be able to
- * swap the cells for 2 given rows as seen in ComplexRowDataBlock).
- */
-public class SimpleRowDataBlock
-{
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new SimpleRowDataBlock(Columns.NONE, 0, false));
-
-    final Columns columns;
-    final CellData data;
-
-    public SimpleRowDataBlock(Columns columns, int rows, boolean isCounter)
-    {
-        this.columns = columns;
-        this.data = new CellData(rows * columns.simpleColumnCount(), isCounter);
-    }
-
-    public Columns columns()
-    {
-        return columns;
-    }
-
-    // Swap row i and j
-    public void swap(int i, int j)
-    {
-        int s = columns.simpleColumnCount();
-        for (int k = 0; k < s; k++)
-            data.swapCell(i * s + k, j * s + k);
-    }
-
-    // Merge row i into j
-    public void merge(int i, int j, int nowInSec)
-    {
-        int s = columns.simpleColumnCount();
-        for (int k = 0; k < s; k++)
-            data.mergeCell(i * s + k, j * s + k, nowInSec);
-    }
-
-    // Move row i into j
-    public void move(int i, int j)
-    {
-        int s = columns.simpleColumnCount();
-        for (int k = 0; k < s; k++)
-            data.moveCell(i * s + k, j * s + k);
-    }
-
-    public long unsharedHeapSizeExcludingData()
-    {
-        return EMPTY_SIZE + data.unsharedHeapSizeExcludingData();
-    }
-
-    public int dataSize()
-    {
-        return data.dataSize();
-    }
-
-    public CellWriter cellWriter(boolean inOrderCells)
-    {
-        return new CellWriter(inOrderCells);
-    }
-
-    public static CellData.ReusableCell reusableCell()
-    {
-        return new CellData.ReusableCell();
-    }
-
-    public static ReusableIterator reusableIterator()
-    {
-        return new ReusableIterator();
-    }
-
-    public void clear()
-    {
-        data.clear();
-    }
-
-    static class ReusableIterator extends UnmodifiableIterator<Cell>
-    {
-        private SimpleRowDataBlock dataBlock;
-        private final CellData.ReusableCell cell = new CellData.ReusableCell();
-
-        private int base;
-        private int column;
-
-        private ReusableIterator()
-        {
-        }
-
-        public ReusableIterator setTo(SimpleRowDataBlock dataBlock, int row)
-        {
-            this.dataBlock = dataBlock;
-            this.base = dataBlock == null ? -1 : row * dataBlock.columns.simpleColumnCount();
-            this.column = 0;
-            return this;
-        }
-
-        public boolean hasNext()
-        {
-            if (dataBlock == null)
-                return false;
-
-            int columnCount = dataBlock.columns.simpleColumnCount();
-            // iterate over column until we find one with data
-            while (column < columnCount && !dataBlock.data.hasCell(base + column))
-                ++column;
-
-            return column < columnCount;
-        }
-
-        public Cell next()
-        {
-            cell.setTo(dataBlock.data, dataBlock.columns.getSimple(column), base + column);
-            ++column;
-            return cell;
-        }
-    }
-
-    public class CellWriter
-    {
-        private final boolean inOrderCells;
-
-        private int base;
-        private int lastColumnIdx;
-
-        public CellWriter(boolean inOrderCells)
-        {
-            this.inOrderCells = inOrderCells;
-        }
-
-        public void addCell(ColumnDefinition column, ByteBuffer value, LivenessInfo info)
-        {
-            int fromIdx = inOrderCells ? lastColumnIdx : 0;
-            lastColumnIdx = columns.simpleIdx(column, fromIdx);
-            assert lastColumnIdx >= 0 : "Cannot find column " + column.name + " in " + columns + " from " + fromIdx;
-            int idx = base + lastColumnIdx;
-            data.setCell(idx, value, info);
-        }
-
-        public void reset()
-        {
-            base = 0;
-            lastColumnIdx = 0;
-            data.clear();
-        }
-
-        public void endOfRow()
-        {
-            base += columns.simpleColumnCount();
-            lastColumnIdx = 0;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/StaticRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/StaticRow.java b/src/java/org/apache/cassandra/db/rows/StaticRow.java
deleted file mode 100644
index 2ad9fb4..0000000
--- a/src/java/org/apache/cassandra/db/rows/StaticRow.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-import org.apache.cassandra.db.*;
-
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.utils.SearchIterator;
-
-public class StaticRow extends AbstractRow
-{
-    private final DeletionTime deletion;
-    private final RowDataBlock data;
-
-    private StaticRow(DeletionTime deletion, RowDataBlock data)
-    {
-        this.deletion = deletion.takeAlias();
-        this.data = data;
-    }
-
-    public Columns columns()
-    {
-        return data.columns();
-    }
-
-    public Cell getCell(ColumnDefinition c)
-    {
-        assert !c.isComplex();
-        if (data.simpleData == null)
-            return null;
-
-        int idx = columns().simpleIdx(c, 0);
-        if (idx < 0)
-            return null;
-
-        return SimpleRowDataBlock.reusableCell().setTo(data.simpleData.data, c, idx);
-    }
-
-    public Cell getCell(ColumnDefinition c, CellPath path)
-    {
-        assert c.isComplex();
-
-        ComplexRowDataBlock dataBlock = data.complexData;
-        if (dataBlock == null)
-            return null;
-
-        int idx = dataBlock.cellIdx(0, c, path);
-        if (idx < 0)
-            return null;
-
-        return SimpleRowDataBlock.reusableCell().setTo(dataBlock.cellData(0), c, idx);
-    }
-
-    public Iterator<Cell> getCells(ColumnDefinition c)
-    {
-        assert c.isComplex();
-        return ComplexRowDataBlock.reusableComplexCells().setTo(data.complexData, 0, c);
-    }
-
-    public boolean hasComplexDeletion()
-    {
-        return data.hasComplexDeletion(0);
-    }
-
-    public DeletionTime getDeletion(ColumnDefinition c)
-    {
-        assert c.isComplex();
-        if (data.complexData == null)
-            return DeletionTime.LIVE;
-
-        int idx = data.complexData.complexDeletionIdx(0, c);
-        return idx < 0
-             ? DeletionTime.LIVE
-             : ComplexRowDataBlock.complexDeletionCursor().setTo(data.complexData.complexDelTimes, idx);
-    }
-
-    public Iterator<Cell> iterator()
-    {
-        return RowDataBlock.reusableIterator().setTo(data, 0);
-    }
-
-    public SearchIterator<ColumnDefinition, ColumnData> searchIterator()
-    {
-        return new SearchIterator<ColumnDefinition, ColumnData>()
-        {
-            private int simpleIdx = 0;
-
-            public boolean hasNext()
-            {
-                // TODO: we can do better, but we expect users to no rely on this anyway
-                return true;
-            }
-
-            public ColumnData next(ColumnDefinition column)
-            {
-                if (column.isComplex())
-                {
-                    // TODO: this is sub-optimal
-
-                    Iterator<Cell> cells = getCells(column);
-                    return cells == null ? null : new ColumnData(column, null, cells, getDeletion(column));
-                }
-                else
-                {
-                    simpleIdx = columns().simpleIdx(column, simpleIdx);
-                    assert simpleIdx >= 0;
-
-                    Cell cell = SimpleRowDataBlock.reusableCell().setTo(data.simpleData.data, column, simpleIdx);
-                    ++simpleIdx;
-                    return cell == null ? null : new ColumnData(column, cell, null, null);
-                }
-            }
-        };
-    }
-
-    public Row takeAlias()
-    {
-        return this;
-    }
-
-    public Clustering clustering()
-    {
-        return Clustering.STATIC_CLUSTERING;
-    }
-
-    public LivenessInfo primaryKeyLivenessInfo()
-    {
-        return LivenessInfo.NONE;
-    }
-
-    public DeletionTime deletion()
-    {
-        return deletion;
-    }
-
-    public static Builder builder(Columns columns, boolean inOrderCells, boolean isCounter)
-    {
-        return new Builder(columns, inOrderCells, isCounter);
-    }
-
-    public static class Builder extends RowDataBlock.Writer
-    {
-        private final RowDataBlock data;
-        private DeletionTime deletion = DeletionTime.LIVE;
-
-        public Builder(Columns columns, boolean inOrderCells, boolean isCounter)
-        {
-            super(inOrderCells);
-            this.data = new RowDataBlock(columns, 1, false, isCounter);
-            updateWriter(data);
-        }
-
-        public void writeClusteringValue(ByteBuffer buffer)
-        {
-            throw new UnsupportedOperationException();
-        }
-
-        public void writePartitionKeyLivenessInfo(LivenessInfo info)
-        {
-            // Static rows are special and don't really have an existence unless they have live cells,
-            // so we shouldn't have any partition key liveness info.
-            assert info.equals(LivenessInfo.NONE);
-        }
-
-        public void writeRowDeletion(DeletionTime deletion)
-        {
-            this.deletion = deletion;
-        }
-
-        public StaticRow build()
-        {
-            return new StaticRow(deletion, data);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/TombstoneFilteringRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/TombstoneFilteringRow.java b/src/java/org/apache/cassandra/db/rows/TombstoneFilteringRow.java
deleted file mode 100644
index a6167ea..0000000
--- a/src/java/org/apache/cassandra/db/rows/TombstoneFilteringRow.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-
-public class TombstoneFilteringRow extends FilteringRow
-{
-    private final int nowInSec;
-
-    public TombstoneFilteringRow(int nowInSec)
-    {
-        this.nowInSec = nowInSec;
-    }
-
-    @Override
-    protected boolean include (LivenessInfo info)
-    {
-        return info.isLive(nowInSec);
-    }
-
-    @Override
-    protected boolean include(DeletionTime dt)
-    {
-        return false;
-    }
-
-    @Override
-    protected boolean include(Cell cell)
-    {
-        return cell.isLive(nowInSec);
-    }
-
-    @Override
-    protected boolean include(ColumnDefinition c, DeletionTime dt)
-    {
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/Unfiltered.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Unfiltered.java b/src/java/org/apache/cassandra/db/rows/Unfiltered.java
index b1692e3..ba03741 100644
--- a/src/java/org/apache/cassandra/db/rows/Unfiltered.java
+++ b/src/java/org/apache/cassandra/db/rows/Unfiltered.java
@@ -57,4 +57,14 @@ public interface Unfiltered extends Clusterable
 
     public String toString(CFMetaData metadata);
     public String toString(CFMetaData metadata, boolean fullDetails);
+
+    default boolean isRow()
+    {
+        return kind() == Kind.ROW;
+    }
+
+    default boolean isRangeTombstoneMarker()
+    {
+        return kind() == Kind.RANGE_TOMBSTONE_MARKER;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index 8abd228..129ed50 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db.rows;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.io.IOError;
 
@@ -30,7 +29,6 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * Serialize/Deserialize an unfiltered row iterator.
@@ -38,7 +36,7 @@ import org.apache.cassandra.utils.FBUtilities;
  * The serialization is composed of a header, follows by the rows and range tombstones of the iterator serialized
  * until we read the end of the partition (see UnfilteredSerializer for details). The header itself
  * is:
- *     <cfid><key><flags><s_header>[<partition_deletion>][<static_row>]
+ *     <cfid><key><flags><s_header>[<partition_deletion>][<static_row>][<row_estimate>]
  * where:
  *     <cfid> is the table cfid.
  *     <key> is the partition key.
@@ -49,23 +47,17 @@ import org.apache.cassandra.utils.FBUtilities;
  *         - has partition deletion: whether or not there is a <partition_deletion> following
  *         - has static row: whether or not there is a <static_row> following
  *         - has row estimate: whether or not there is a <row_estimate> following
- *     <s_header> is the SerializationHeader. More precisely it's
- *           <min_timetamp><min_localDelTime><min_ttl>[<static_columns>]<columns>
- *         where:
- *           - <min_timestamp> is the base timestamp used for delta-encoding timestamps
- *           - <min_localDelTime> is the base localDeletionTime used for delta-encoding local deletion times
- *           - <min_ttl> is the base localDeletionTime used for delta-encoding ttls
- *           - <static_columns> is the static columns if a static row is present. It's
- *             the number of columns as an unsigned short, followed by the column names.
- *           - <columns> is the columns of the rows of the iterator. It's serialized as <static_columns>.
+ *     <s_header> is the {@code SerializationHeader}. It contains in particular the columns contains in the serialized
+ *         iterator as well as other information necessary to decoding the serialized rows
+ *         (see {@code SerializationHeader.Serializer for details}).
  *     <partition_deletion> is the deletion time for the partition (delta-encoded)
  *     <static_row> is the static row for this partition as serialized by UnfilteredSerializer.
- *     <row_estimate> is the (potentially estimated) number of rows serialized. This is only use for
- *         the purpose of some sizing on the receiving end and should not be relied upon too strongly.
+ *     <row_estimate> is the (potentially estimated) number of rows serialized. This is only used for
+ *         the purpose of sizing on the receiving end and should not be relied upon too strongly.
  *
- * !!! Please note that the serialized value depends on the schema and as such should not be used as is if
- *     it might be deserialized after the schema as changed !!!
- * TODO: we should add a flag to include the relevant metadata in the header for commit log etc.....
+ * Please note that the format described above is the on-wire format. On-disk, the format is basically the
+ * same, but the header is written once per sstable, not once per-partition. Further, the actual row and
+ * range tombstones are not written using this class, but rather by {@link ColumnIndex}.
  */
 public class UnfilteredRowIteratorSerializer
 {
@@ -79,11 +71,13 @@ public class UnfilteredRowIteratorSerializer
 
     public static final UnfilteredRowIteratorSerializer serializer = new UnfilteredRowIteratorSerializer();
 
+    // Should only be used for the on-wire format.
     public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, int version) throws IOException
     {
         serialize(iterator, out, version, -1);
     }
 
+    // Should only be used for the on-wire format.
     public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, int version, int rowEstimate) throws IOException
     {
         SerializationHeader header = new SerializationHeader(iterator.metadata(),
@@ -92,6 +86,7 @@ public class UnfilteredRowIteratorSerializer
         serialize(iterator, out, header, version, rowEstimate);
     }
 
+    // Should only be used for the on-wire format.
     public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, SerializationHeader header, int version, int rowEstimate) throws IOException
     {
         CFMetaData.serializer.serialize(iterator.metadata(), out, version);
@@ -129,7 +124,7 @@ public class UnfilteredRowIteratorSerializer
             UnfilteredSerializer.serializer.serialize(staticRow, header, out, version);
 
         if (rowEstimate >= 0)
-            out.writeInt(rowEstimate);
+            out.writeVInt(rowEstimate);
 
         while (iterator.hasNext())
             UnfilteredSerializer.serializer.serialize(iterator.next(), header, out, version);
@@ -137,7 +132,7 @@ public class UnfilteredRowIteratorSerializer
     }
 
     // Please note that this consume the iterator, and as such should not be called unless we have a simple way to
-    // recreate an iterator for both serialize and serializedSize, which is mostly only PartitionUpdate
+    // recreate an iterator for both serialize and serializedSize, which is mostly only PartitionUpdate/ArrayBackedCachedPartition.
     public long serializedSize(UnfilteredRowIterator iterator, int version, int rowEstimate)
     {
         SerializationHeader header = new SerializationHeader(iterator.metadata(),
@@ -166,7 +161,7 @@ public class UnfilteredRowIteratorSerializer
             size += UnfilteredSerializer.serializer.serializedSize(staticRow, header, version);
 
         if (rowEstimate >= 0)
-            size += TypeSizes.sizeof(rowEstimate);
+            size += TypeSizes.sizeofVInt(rowEstimate);
 
         while (iterator.hasNext())
             size += UnfilteredSerializer.serializer.serializedSize(iterator.next(), header, version);
@@ -197,41 +192,29 @@ public class UnfilteredRowIteratorSerializer
 
         Row staticRow = Rows.EMPTY_STATIC_ROW;
         if (hasStatic)
-            staticRow = UnfilteredSerializer.serializer.deserializeStaticRow(in, header, new SerializationHelper(version, flag));
+            staticRow = UnfilteredSerializer.serializer.deserializeStaticRow(in, header, new SerializationHelper(metadata, version, flag));
 
-        int rowEstimate = hasRowEstimate ? in.readInt() : -1;
+        int rowEstimate = hasRowEstimate ? (int)in.readVInt() : -1;
         return new Header(header, metadata, key, isReversed, false, partitionDeletion, staticRow, rowEstimate);
     }
 
-    public void deserialize(DataInput in, SerializationHelper helper, SerializationHeader header, Row.Writer rowWriter, RangeTombstoneMarker.Writer markerWriter) throws IOException
+    public UnfilteredRowIterator deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, Header header) throws IOException
     {
-        while (UnfilteredSerializer.serializer.deserialize(in, header, helper, rowWriter, markerWriter) != null);
-    }
-
-    public UnfilteredRowIterator deserialize(final DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
-    {
-        final Header h = deserializeHeader(in, version, flag);
-
-        if (h.isEmpty)
-            return UnfilteredRowIterators.emptyIterator(h.metadata, h.key, h.isReversed);
-
-        final int clusteringSize = h.metadata.clusteringColumns().size();
-        final SerializationHelper helper = new SerializationHelper(version, flag);
+        if (header.isEmpty)
+            return UnfilteredRowIterators.emptyIterator(header.metadata, header.key, header.isReversed);
 
-        return new AbstractUnfilteredRowIterator(h.metadata, h.key, h.partitionDeletion, h.sHeader.columns(), h.staticRow, h.isReversed, h.sHeader.stats())
+        final SerializationHelper helper = new SerializationHelper(header.metadata, version, flag);
+        final SerializationHeader sHeader = header.sHeader;
+        return new AbstractUnfilteredRowIterator(header.metadata, header.key, header.partitionDeletion, sHeader.columns(), header.staticRow, header.isReversed, sHeader.stats())
         {
-            private final ReusableRow row = new ReusableRow(clusteringSize, h.sHeader.columns().regulars, true, h.metadata.isCounter());
-            private final RangeTombstoneMarker.Builder markerBuilder = new RangeTombstoneMarker.Builder(clusteringSize);
+            private final Row.Builder builder = ArrayBackedRow.sortedBuilder(sHeader.columns().regulars);
 
             protected Unfiltered computeNext()
             {
                 try
                 {
-                    Unfiltered.Kind kind = UnfilteredSerializer.serializer.deserialize(in, h.sHeader, helper, row.writer(), markerBuilder.reset());
-                    if (kind == null)
-                        return endOfData();
-
-                    return kind == Unfiltered.Kind.ROW ? row : markerBuilder.build();
+                    Unfiltered unfiltered = UnfilteredSerializer.serializer.deserialize(in, sHeader, helper, builder);
+                    return unfiltered == null ? endOfData() : unfiltered;
                 }
                 catch (IOException e)
                 {
@@ -241,30 +224,34 @@ public class UnfilteredRowIteratorSerializer
         };
     }
 
+    public UnfilteredRowIterator deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
+    {
+        return deserialize(in, version, flag,  deserializeHeader(in, version, flag));
+    }
+
     public static void writeDelTime(DeletionTime dt, SerializationHeader header, DataOutputPlus out) throws IOException
     {
-        out.writeLong(header.encodeTimestamp(dt.markedForDeleteAt()));
-        out.writeInt(header.encodeDeletionTime(dt.localDeletionTime()));
+        out.writeVInt(header.encodeTimestamp(dt.markedForDeleteAt()));
+        out.writeVInt(header.encodeDeletionTime(dt.localDeletionTime()));
     }
 
     public static long delTimeSerializedSize(DeletionTime dt, SerializationHeader header)
     {
-        return TypeSizes.sizeof(header.encodeTimestamp(dt.markedForDeleteAt()))
-             + TypeSizes.sizeof(header.encodeDeletionTime(dt.localDeletionTime()));
+        return TypeSizes.sizeofVInt(header.encodeTimestamp(dt.markedForDeleteAt()))
+             + TypeSizes.sizeofVInt(header.encodeDeletionTime(dt.localDeletionTime()));
     }
 
-    public static DeletionTime readDelTime(DataInput in, SerializationHeader header) throws IOException
+    public static DeletionTime readDelTime(DataInputPlus in, SerializationHeader header) throws IOException
     {
-        long markedAt = header.decodeTimestamp(in.readLong());
-        int localDelTime = header.decodeDeletionTime(in.readInt());
-        return new SimpleDeletionTime(markedAt, localDelTime);
+        long markedAt = header.decodeTimestamp(in.readVInt());
+        int localDelTime = header.decodeDeletionTime((int)in.readVInt());
+        return new DeletionTime(markedAt, localDelTime);
     }
 
-    public static void skipDelTime(DataInput in, SerializationHeader header) throws IOException
+    public static void skipDelTime(DataInputPlus in, SerializationHeader header) throws IOException
     {
-        // Note that since we might use VINT, we shouldn't assume the size of a long or an int
-        in.readLong();
-        in.readInt();
+        in.readVInt();
+        in.readVInt();
     }
 
     public static class Header

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 2c71cf3..6b6ec67 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db.rows;
 
-import java.nio.ByteBuffer;
 import java.util.*;
 import java.security.MessageDigest;
 
@@ -26,13 +25,10 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.AbstractIterator;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.IMergeIterator;
 import org.apache.cassandra.utils.MergeIterator;
@@ -49,13 +45,9 @@ public abstract class UnfilteredRowIterators
 
     public interface MergeListener
     {
-        public void onMergePartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions);
-
-        public void onMergingRows(Clustering clustering, LivenessInfo mergedInfo, DeletionTime mergedDeletion, Row[] versions);
-        public void onMergedComplexDeletion(ColumnDefinition c, DeletionTime mergedComplexDeletion, DeletionTime[] versions);
-        public void onMergedCells(Cell mergedCell, Cell[] versions);
-        public void onRowDone();
+        public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions);
 
+        public void onMergedRows(Row merged, Columns columns, Row[] versions);
         public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions);
 
         public void close();
@@ -87,14 +79,13 @@ public abstract class UnfilteredRowIterators
     }
 
     /**
-     * Returns an iterator that is the result of merging other iterators, and using
+     * Returns an iterator that is the result of merging other iterators, and (optionally) using
      * specific MergeListener.
      *
      * Note that this method assumes that there is at least 2 iterators to merge.
      */
     public static UnfilteredRowIterator merge(List<UnfilteredRowIterator> iterators, int nowInSec, MergeListener mergeListener)
     {
-        assert mergeListener != null;
         return UnfilteredRowMergeIterator.create(iterators, nowInSec, mergeListener);
     }
 
@@ -175,10 +166,7 @@ public abstract class UnfilteredRowIterators
         while (iterator.hasNext())
         {
             Unfiltered unfiltered = iterator.next();
-            if (unfiltered.kind() == Unfiltered.Kind.ROW)
-                ((Row) unfiltered).digest(digest);
-            else
-                ((RangeTombstoneMarker) unfiltered).digest(digest);
+            unfiltered.digest(digest);
         }
     }
 
@@ -198,12 +186,12 @@ public abstract class UnfilteredRowIterators
             && iter1.staticRow().equals(iter2.staticRow());
 
         return new AbstractUnfilteredRowIterator(iter1.metadata(),
-                                        iter1.partitionKey(),
-                                        iter1.partitionLevelDeletion(),
-                                        iter1.columns(),
-                                        iter1.staticRow(),
-                                        iter1.isReverseOrder(),
-                                        iter1.stats())
+                                                 iter1.partitionKey(),
+                                                 iter1.partitionLevelDeletion(),
+                                                 iter1.columns(),
+                                                 iter1.staticRow(),
+                                                 iter1.isReverseOrder(),
+                                                 iter1.stats())
         {
             protected Unfiltered computeNext()
             {
@@ -230,155 +218,35 @@ public abstract class UnfilteredRowIterators
 
     public static UnfilteredRowIterator cloningIterator(UnfilteredRowIterator iterator, final AbstractAllocator allocator)
     {
-        return new WrappingUnfilteredRowIterator(iterator)
+        return new AlteringUnfilteredRowIterator(iterator)
         {
-            private final CloningRow cloningRow = new CloningRow();
-            private final RangeTombstoneMarker.Builder markerBuilder = new RangeTombstoneMarker.Builder(iterator.metadata().comparator.size());
-
-            public Unfiltered next()
-            {
-                Unfiltered next = super.next();
-                return next.kind() == Unfiltered.Kind.ROW
-                     ? cloningRow.setTo((Row)next)
-                     : clone((RangeTombstoneMarker)next);
-            }
-
-            private RangeTombstoneMarker clone(RangeTombstoneMarker marker)
-            {
-                markerBuilder.reset();
+            private Row.Builder regularBuilder;
 
-                RangeTombstone.Bound bound = marker.clustering();
-                for (int i = 0; i < bound.size(); i++)
-                    markerBuilder.writeClusteringValue(allocator.clone(bound.get(i)));
-                markerBuilder.writeBoundKind(bound.kind());
-                if (marker.isBoundary())
-                {
-                    RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker;
-                    markerBuilder.writeBoundaryDeletion(bm.endDeletionTime(), bm.startDeletionTime());
-                }
-                else
-                {
-                    markerBuilder.writeBoundDeletion(((RangeTombstoneBoundMarker)marker).deletionTime());
-                }
-                markerBuilder.endOfMarker();
-                return markerBuilder.build();
-            }
-
-            class CloningRow extends WrappingRow
+            @Override
+            protected Row computeNextStatic(Row row)
             {
-                private final CloningClustering cloningClustering = new CloningClustering();
-                private final CloningCell cloningCell = new CloningCell();
-
-                protected Cell filterCell(Cell cell)
-                {
-                    return cloningCell.setTo(cell);
-                }
-
-                @Override
-                public Clustering clustering()
-                {
-                    return cloningClustering.setTo(super.clustering());
-                }
+                Row.Builder staticBuilder = allocator.cloningArrayBackedRowBuilder(columns().statics);
+                return Rows.copy(row, staticBuilder).build();
             }
 
-            class CloningClustering extends Clustering
+            @Override
+            protected Row computeNext(Row row)
             {
-                private Clustering wrapped;
-
-                public Clustering setTo(Clustering wrapped)
-                {
-                    this.wrapped = wrapped;
-                    return this;
-                }
-
-                public int size()
-                {
-                    return wrapped.size();
-                }
-
-                public ByteBuffer get(int i)
-                {
-                    ByteBuffer value = wrapped.get(i);
-                    return value == null ? null : allocator.clone(value);
-                }
+                if (regularBuilder == null)
+                    regularBuilder = allocator.cloningArrayBackedRowBuilder(columns().regulars);
 
-                public ByteBuffer[] getRawValues()
-                {
-                    throw new UnsupportedOperationException();
-                }
+                return Rows.copy(row, regularBuilder).build();
             }
 
-            class CloningCell extends AbstractCell
+            @Override
+            protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker)
             {
-                private Cell wrapped;
-
-                public Cell setTo(Cell wrapped)
-                {
-                    this.wrapped = wrapped;
-                    return this;
-                }
-
-                public ColumnDefinition column()
-                {
-                    return wrapped.column();
-                }
-
-                public boolean isCounterCell()
-                {
-                    return wrapped.isCounterCell();
-                }
-
-                public ByteBuffer value()
-                {
-                    return allocator.clone(wrapped.value());
-                }
-
-                public LivenessInfo livenessInfo()
-                {
-                    return wrapped.livenessInfo();
-                }
-
-                public CellPath path()
-                {
-                    CellPath path = wrapped.path();
-                    if (path == null)
-                        return null;
-
-                    assert path.size() == 1;
-                    return CellPath.create(allocator.clone(path.get(0)));
-                }
+                return marker.copy(allocator);
             }
         };
     }
 
     /**
-     * Turns the given iterator into an update.
-     *
-     * Warning: this method does not close the provided iterator, it is up to
-     * the caller to close it.
-     */
-    public static PartitionUpdate toUpdate(UnfilteredRowIterator iterator)
-    {
-        PartitionUpdate update = new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), iterator.columns(), 1);
-
-        update.addPartitionDeletion(iterator.partitionLevelDeletion());
-
-        if (iterator.staticRow() != Rows.EMPTY_STATIC_ROW)
-            iterator.staticRow().copyTo(update.staticWriter());
-
-        while (iterator.hasNext())
-        {
-            Unfiltered unfiltered = iterator.next();
-            if (unfiltered.kind() == Unfiltered.Kind.ROW)
-                ((Row) unfiltered).copyTo(update.writer());
-            else
-                ((RangeTombstoneMarker) unfiltered).copyTo(update.markerWriter(iterator.isReverseOrder()));
-        }
-
-        return update;
-    }
-
-    /**
      * Validate that the data of the provided iterator is valid, that is that the values
      * it contains are valid for the type they represent, and more generally that the
      * infos stored are sensible.
@@ -393,15 +261,34 @@ public abstract class UnfilteredRowIterators
      */
     public static UnfilteredRowIterator withValidation(UnfilteredRowIterator iterator, final String filename)
     {
-        return new WrappingUnfilteredRowIterator(iterator)
+        return new AlteringUnfilteredRowIterator(iterator)
         {
-            public Unfiltered next()
+            @Override
+            protected Row computeNextStatic(Row row)
+            {
+                validate(row);
+                return row;
+            }
+
+            @Override
+            protected Row computeNext(Row row)
+            {
+                validate(row);
+                return row;
+            }
+
+            @Override
+            protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker)
+            {
+                validate(marker);
+                return marker;
+            }
+
+            private void validate(Unfiltered unfiltered)
             {
-                Unfiltered next = super.next();
                 try
                 {
-                    next.validateData(metadata());
-                    return next;
+                    unfiltered.validateData(iterator.metadata());
                 }
                 catch (MarshalException me)
                 {
@@ -412,56 +299,6 @@ public abstract class UnfilteredRowIterators
     }
 
     /**
-     * Convert all expired cells to equivalent tombstones.
-     * <p>
-     * Once a cell expires, it acts exactly as a tombstone and this until it is purged. But in particular that
-     * means we don't care about the value of an expired cell, and it is thus equivalent but more efficient to
-     * replace the expired cell by an equivalent tombstone (that has no value).
-     *
-     * @param iterator the iterator in which to conver expired cells.
-     * @param nowInSec the current time to use to decide if a cell is expired.
-     * @return an iterator that returns the same data than {@code iterator} but with all expired cells converted
-     * to equivalent tombstones.
-     */
-    public static UnfilteredRowIterator convertExpiredCellsToTombstones(UnfilteredRowIterator iterator, final int nowInSec)
-    {
-        return new FilteringRowIterator(iterator)
-        {
-            protected FilteringRow makeRowFilter()
-            {
-                return new FilteringRow()
-                {
-                    @Override
-                    protected Cell filterCell(Cell cell)
-                    {
-                        Cell filtered = super.filterCell(cell);
-                        if (filtered == null)
-                            return null;
-
-                        LivenessInfo info = filtered.livenessInfo();
-                        if (info.hasTTL() && !filtered.isLive(nowInSec))
-                        {
-                            // The column is now expired, we can safely return a simple tombstone. Note that as long as the expiring
-                            // column and the tombstone put together live longer than GC grace seconds, we'll fulfil our responsibility
-                            // to repair. See discussion at
-                            // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html
-                            return Cells.create(filtered.column(),
-                                                filtered.isCounterCell(),
-                                                ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                SimpleLivenessInfo.forDeletion(info.timestamp(), info.localDeletionTime() - info.ttl()),
-                                                filtered.path());
-                        }
-                        else
-                        {
-                            return filtered;
-                        }
-                    }
-                };
-            }
-        };
-    }
-
-    /**
      * Wraps the provided iterator so it logs the returned atoms for debugging purposes.
      * <p>
      * Note that this is only meant for debugging as this can log a very large amount of
@@ -478,26 +315,28 @@ public abstract class UnfilteredRowIterators
                     iterator.isReverseOrder(),
                     iterator.partitionLevelDeletion().markedForDeleteAt());
 
-        return new WrappingUnfilteredRowIterator(iterator)
+        return new AlteringUnfilteredRowIterator(iterator)
         {
             @Override
-            public Row staticRow()
+            protected Row computeNextStatic(Row row)
             {
-                Row row = super.staticRow();
                 if (!row.isEmpty())
                     logger.info("[{}] {}", id, row.toString(metadata(), fullDetails));
                 return row;
             }
 
             @Override
-            public Unfiltered next()
+            protected Row computeNext(Row row)
             {
-                Unfiltered next = super.next();
-                if (next.kind() == Unfiltered.Kind.ROW)
-                    logger.info("[{}] {}", id, ((Row)next).toString(metadata(), fullDetails));
-                else
-                    logger.info("[{}] {}", id, ((RangeTombstoneMarker)next).toString(metadata()));
-                return next;
+                logger.info("[{}] {}", id, row.toString(metadata(), fullDetails));
+                return row;
+            }
+
+            @Override
+            protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker)
+            {
+                logger.info("[{}] {}", id, marker.toString(metadata()));
+                return marker;
             }
         };
     }
@@ -526,10 +365,10 @@ public abstract class UnfilteredRowIterators
                   reversed,
                   mergeStats(iterators));
 
-            this.listener = listener;
             this.mergeIterator = MergeIterator.get(iterators,
                                                    reversed ? metadata.comparator.reversed() : metadata.comparator,
-                                                   new MergeReducer(metadata, iterators.size(), reversed, nowInSec));
+                                                   new MergeReducer(iterators.size(), reversed, nowInSec, listener));
+            this.listener = listener;
         }
 
         private static UnfilteredRowMergeIterator create(List<UnfilteredRowIterator> iterators, int nowInSec, MergeListener listener)
@@ -591,7 +430,7 @@ public abstract class UnfilteredRowIterators
                     delTime = iterDeletion;
             }
             if (listener != null && !delTime.isLive())
-                listener.onMergePartitionLevelDeletion(delTime, versions);
+                listener.onMergedPartitionLevelDeletion(delTime, versions);
             return delTime;
         }
 
@@ -605,14 +444,19 @@ public abstract class UnfilteredRowIterators
             if (columns.isEmpty())
                 return Rows.EMPTY_STATIC_ROW;
 
-            Row.Merger merger = Row.Merger.createStatic(metadata, iterators.size(), nowInSec, columns, listener);
+            if (iterators.stream().allMatch(iter -> iter.staticRow().isEmpty()))
+                return Rows.EMPTY_STATIC_ROW;
+
+            Row.Merger merger = new Row.Merger(iterators.size(), nowInSec, columns);
             for (int i = 0; i < iterators.size(); i++)
                 merger.add(i, iterators.get(i).staticRow());
 
-            // Note that we should call 'takeAlias' on the result in theory, but we know that we
-            // won't reuse the merger and so that it's ok not to.
             Row merged = merger.merge(partitionDeletion);
-            return merged == null ? Rows.EMPTY_STATIC_ROW : merged;
+            if (merged == null)
+                merged = Rows.EMPTY_STATIC_ROW;
+            if (listener != null)
+                listener.onMergedRows(merged, columns, merger.mergedRows());
+            return merged;
         }
 
         private static PartitionColumns collectColumns(List<UnfilteredRowIterator> iterators)
@@ -659,26 +503,26 @@ public abstract class UnfilteredRowIterators
                 listener.close();
         }
 
-        /**
-         * Specific reducer for merge operations that rewrite the same reusable
-         * row every time. This also skip cells shadowed by range tombstones when writing.
-         */
         private class MergeReducer extends MergeIterator.Reducer<Unfiltered, Unfiltered>
         {
+            private final MergeListener listener;
+
             private Unfiltered.Kind nextKind;
 
             private final Row.Merger rowMerger;
             private final RangeTombstoneMarker.Merger markerMerger;
 
-            private MergeReducer(CFMetaData metadata, int size, boolean reversed, int nowInSec)
+            private MergeReducer(int size, boolean reversed, int nowInSec, MergeListener listener)
             {
-                this.rowMerger = Row.Merger.createRegular(metadata, size, nowInSec, columns().regulars, listener);
-                this.markerMerger = new RangeTombstoneMarker.Merger(metadata, size, partitionLevelDeletion(), reversed, listener);
+                this.rowMerger = new Row.Merger(size, nowInSec, columns().regulars);
+                this.markerMerger = new RangeTombstoneMarker.Merger(size, partitionLevelDeletion(), reversed);
+                this.listener = listener;
             }
 
             @Override
             public boolean trivialReduceIsTrivial()
             {
+                // If we have a listener, we must signal it even when we have a single version
                 return listener == null;
             }
 
@@ -693,9 +537,20 @@ public abstract class UnfilteredRowIterators
 
             protected Unfiltered getReduced()
             {
-                return nextKind == Unfiltered.Kind.ROW
-                     ? rowMerger.merge(markerMerger.activeDeletion())
-                     : markerMerger.merge();
+                if (nextKind == Unfiltered.Kind.ROW)
+                {
+                    Row merged = rowMerger.merge(markerMerger.activeDeletion());
+                    if (listener != null)
+                        listener.onMergedRows(merged == null ? ArrayBackedRow.emptyRow(rowMerger.mergedClustering()) : merged, columns().regulars, rowMerger.mergedRows());
+                    return merged;
+                }
+                else
+                {
+                    RangeTombstoneMarker merged = markerMerger.merge();
+                    if (merged != null && listener != null)
+                        listener.onMergedRangeTombstoneMarkers(merged, markerMerger.mergedMarkers());
+                    return merged;
+                }
             }
 
             protected void onKeyChange()
@@ -712,13 +567,11 @@ public abstract class UnfilteredRowIterators
     {
         private final UnfilteredRowIterator iter;
         private final int nowInSec;
-        private final TombstoneFilteringRow filter;
 
         public FilteringIterator(UnfilteredRowIterator iter, int nowInSec)
         {
             this.iter = iter;
             this.nowInSec = nowInSec;
-            this.filter = new TombstoneFilteringRow(nowInSec);
         }
 
         public CFMetaData metadata()
@@ -744,7 +597,11 @@ public abstract class UnfilteredRowIterators
         public Row staticRow()
         {
             Row row = iter.staticRow();
-            return row.isEmpty() ? row : new TombstoneFilteringRow(nowInSec).setTo(row);
+            if (row.isEmpty())
+                return Rows.EMPTY_STATIC_ROW;
+
+            row = row.purge(DeletionPurger.PURGE_ALL, nowInSec);
+            return row == null ? Rows.EMPTY_STATIC_ROW : row;
         }
 
         protected Row computeNext()
@@ -752,11 +609,11 @@ public abstract class UnfilteredRowIterators
             while (iter.hasNext())
             {
                 Unfiltered next = iter.next();
-                if (next.kind() != Unfiltered.Kind.ROW)
+                if (next.isRangeTombstoneMarker())
                     continue;
 
-                Row row = filter.setTo((Row)next);
-                if (!row.isEmpty())
+                Row row = ((Row)next).purge(DeletionPurger.PURGE_ALL, nowInSec);
+                if (row != null)
                     return row;
             }
             return endOfData();