You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/07/22 18:05:36 UTC

[07/15] cassandra git commit: Simplify some 8099's implementations

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/ColumnData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java b/src/java/org/apache/cassandra/db/rows/ColumnData.java
index ea472eb..95bad48 100644
--- a/src/java/org/apache/cassandra/db/rows/ColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java
@@ -17,45 +17,66 @@
  */
 package org.apache.cassandra.db.rows;
 
-import java.util.Iterator;
+import java.security.MessageDigest;
+import java.util.Comparator;
 
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.DeletionPurger;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.serializers.MarshalException;
 
-public class ColumnData
+/**
+ * Generic interface for the data of a given column (inside a row).
+ *
+ * In practice, there is only 2 implementations of this: either {@link Cell} for simple columns
+ * or {@code ComplexColumnData} for complex columns.
+ */
+public interface ColumnData
 {
-    private final ColumnDefinition column;
-    private final Cell cell;
-    private final Iterator<Cell> cells;
-    private final DeletionTime complexDeletion;
-
-    ColumnData(ColumnDefinition column, Cell cell, Iterator<Cell> cells, DeletionTime complexDeletion)
-    {
-        assert column != null && (cell != null || (column.isComplex() && cells != null && complexDeletion != null));
-
-        this.column = column;
-        this.cell = cell;
-        this.cells = cells;
-        this.complexDeletion = complexDeletion;
-    }
-
-    public ColumnDefinition column()
-    {
-        return column;
-    }
-
-    public Cell cell()
-    {
-        return cell;
-    }
-
-    public Iterator<Cell> cells()
-    {
-        return cells;
-    }
-
-    public DeletionTime complexDeletion()
-    {
-        return complexDeletion;
-    }
+    public static final Comparator<ColumnData> comparator = (cd1, cd2) -> cd1.column().compareTo(cd2.column());
+
+    // A comparator for the cells of the *similar* ColumnData, i.e. one that assumes the cells are all for the same column.
+    public static final Comparator<Cell> cellComparator = (c1, c2) -> c1.column().cellPathComparator().compare(c1.path(), c2.path());
+
+    /**
+     * The column this is data for.
+     *
+     * @return the column this is a data for.
+     */
+    public ColumnDefinition column();
+
+    /**
+     * The size of the data hold by this {@code ColumnData}.
+     *
+     * @return the size used by the data of this {@code ColumnData}.
+     */
+    public int dataSize();
+
+    public long unsharedHeapSizeExcludingData();
+
+    /**
+     * Validate the column data.
+     *
+     * @throws MarshalException if the data is not valid.
+     */
+    public void validate();
+
+    /**
+     * Adds the data to the provided digest.
+     *
+     * @param digest the {@code MessageDigest} to add the data to.
+     */
+    public void digest(MessageDigest digest);
+
+    /**
+     * Returns a copy of the data where all timestamps for live data have replaced by {@code newTimestamp} and
+     * all deletion timestamp by {@code newTimestamp - 1}.
+     *
+     * This exists for the Paxos path, see {@link PartitionUpdate#updateAllTimestamp} for additional details.
+     */
+    public ColumnData updateAllTimestamp(long newTimestamp);
+
+    public ColumnData markCounterLocalToBeCleared();
+
+    public ColumnData purge(DeletionPurger purger, int nowInSec);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
new file mode 100644
index 0000000..d87402a
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.*;
+
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.DeletionPurger;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.marshal.ByteType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.utils.ObjectSizes;
+
+/**
+ * The data for a complex column, that is it's cells and potential complex
+ * deletion time.
+ */
+public class ComplexColumnData implements ColumnData, Iterable<Cell>
+{
+    static final Cell[] NO_CELLS = new Cell[0];
+
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new ComplexColumnData(ColumnDefinition.regularDef("", "", "", SetType.getInstance(ByteType.instance, true)), NO_CELLS, new DeletionTime(0, 0)));
+
+    private final ColumnDefinition column;
+
+    // The cells for 'column' sorted by cell path.
+    private final Cell[] cells;
+
+    private final DeletionTime complexDeletion;
+
+    // Only ArrayBackedRow should call this.
+    ComplexColumnData(ColumnDefinition column, Cell[] cells, DeletionTime complexDeletion)
+    {
+        assert column.isComplex();
+        assert cells.length > 0 || !complexDeletion.isLive();
+        this.column = column;
+        this.cells = cells;
+        this.complexDeletion = complexDeletion;
+    }
+
+    public boolean hasCells()
+    {
+        return cellsCount() > 0;
+    }
+
+    public int cellsCount()
+    {
+        return cells.length;
+    }
+
+    public ColumnDefinition column()
+    {
+        return column;
+    }
+
+    public Cell getCell(CellPath path)
+    {
+        int idx = binarySearch(path);
+        return idx < 0 ? null : cells[idx];
+    }
+
+    public Cell getCellByIndex(int i)
+    {
+        assert 0 <= i && i < cells.length;
+        return cells[i];
+    }
+
+    /**
+     * The complex deletion time of the complex column.
+     * <p>
+     * The returned "complex deletion" is a deletion of all the cells of the column. For instance,
+     * for a collection, this correspond to a full collection deletion.
+     * Please note that this deletion says nothing about the individual cells of the complex column:
+     * there can be no complex deletion but some of the individual cells can be deleted.
+     *
+     * @return the complex deletion time for the column this is the data of or {@code DeletionTime.LIVE}
+     * if the column is not deleted.
+     */
+    public DeletionTime complexDeletion()
+    {
+        return complexDeletion;
+    }
+
+    public Iterator<Cell> iterator()
+    {
+        return Iterators.forArray(cells);
+    }
+
+    public int dataSize()
+    {
+        int size = complexDeletion.dataSize();
+        for (Cell cell : cells)
+            size += cell.dataSize();
+        return size;
+    }
+
+    public long unsharedHeapSizeExcludingData()
+    {
+        long heapSize = EMPTY_SIZE + ObjectSizes.sizeOfArray(cells);
+        for (Cell cell : cells)
+            heapSize += cell.unsharedHeapSizeExcludingData();
+        return heapSize;
+    }
+
+    public void validate()
+    {
+        for (Cell cell : cells)
+            cell.validate();
+    }
+
+    public ComplexColumnData filter(ColumnFilter filter, DeletionTime activeDeletion, CFMetaData.DroppedColumn dropped)
+    {
+        ColumnFilter.Tester cellTester = filter.newTester(column);
+        if (cellTester == null && activeDeletion.isLive() && dropped == null)
+            return this;
+
+        DeletionTime newComplexDeletion = activeDeletion.supersedes(complexDeletion) ? DeletionTime.LIVE : complexDeletion;
+
+        int newSize = 0;
+        for (Cell cell : cells)
+        {
+            // The cell must be:
+            //   - Included by the query
+            //   - not shadowed by the active deletion
+            //   - not being for a dropped column
+            if ((cellTester == null || cellTester.includes(cell.path()))
+                 && !activeDeletion.deletes(cell)
+                 && (dropped == null || cell.timestamp() > dropped.droppedTime))
+                ++newSize;
+        }
+
+
+        if (newSize == 0)
+            return newComplexDeletion.isLive() ? null : new ComplexColumnData(column, NO_CELLS, newComplexDeletion);
+
+        if (newSize == cells.length && newComplexDeletion == complexDeletion)
+            return this;
+
+        Cell[] newCells = new Cell[newSize];
+        int j = 0;
+        cellTester = filter.newTester(column); // we need to reste the tester
+        for (Cell cell : cells)
+        {
+            if ((cellTester == null || cellTester.includes(cell.path()))
+                && !activeDeletion.deletes(cell)
+                && (dropped == null || cell.timestamp() > dropped.droppedTime))
+                newCells[j++] = cell;
+        }
+        assert j == newSize;
+
+        return new ComplexColumnData(column, newCells, newComplexDeletion);
+    }
+
+    public void digest(MessageDigest digest)
+    {
+        if (!complexDeletion.isLive())
+            complexDeletion.digest(digest);
+
+        for (Cell cell : cells)
+            cell.digest(digest);
+    }
+
+    public ComplexColumnData markCounterLocalToBeCleared()
+    {
+        Cell[] newCells = null;
+        for (int i = 0; i < cells.length; i++)
+        {
+            Cell cell = cells[i];
+            Cell marked = cell.markCounterLocalToBeCleared();
+            if (marked != cell)
+            {
+                if (newCells == null)
+                    newCells = Arrays.copyOf(cells, cells.length);
+                newCells[i] = marked;
+            }
+        }
+
+        return newCells == null
+             ? this
+             : new ComplexColumnData(column, newCells, complexDeletion);
+    }
+
+    public ComplexColumnData purge(DeletionPurger purger, int nowInSec)
+    {
+        DeletionTime newDeletion = complexDeletion.isLive() || purger.shouldPurge(complexDeletion) ? DeletionTime.LIVE : complexDeletion;
+
+        int newSize = 0;
+        for (Cell cell : cells)
+        {
+            Cell purged = cell.purge(purger, nowInSec);
+            if (purged != null)
+                ++newSize;
+        }
+
+        if (newSize == 0)
+            return newDeletion.isLive() ? null : new ComplexColumnData(column, NO_CELLS, newDeletion);
+
+        if (newDeletion == complexDeletion && newSize == cells.length)
+            return this;
+
+        Cell[] newCells = new Cell[newSize];
+        int j = 0;
+        for (Cell cell : cells)
+        {
+            Cell purged = cell.purge(purger, nowInSec);
+            if (purged != null)
+                newCells[j++] = purged;
+        }
+        assert j == newSize;
+
+        return new ComplexColumnData(column, newCells, newDeletion);
+    }
+
+    public ComplexColumnData updateAllTimestamp(long newTimestamp)
+    {
+        DeletionTime newDeletion = complexDeletion.isLive() ? complexDeletion : new DeletionTime(newTimestamp - 1, complexDeletion.localDeletionTime());
+        Cell[] newCells = new Cell[cells.length];
+        for (int i = 0; i < cells.length; i++)
+            newCells[i] = (Cell)cells[i].updateAllTimestamp(newTimestamp);
+
+        return new ComplexColumnData(column, newCells, newDeletion);
+    }
+
+    // This is the partner in crime of ArrayBackedRow.setValue. The exact warning apply. The short
+    // version is: "don't use that method".
+    void setValue(CellPath path, ByteBuffer value)
+    {
+        int idx = binarySearch(path);
+        assert idx >= 0;
+        cells[idx] = cells[idx].withUpdatedValue(value);
+    }
+
+    private int binarySearch(CellPath path)
+    {
+        return binarySearch(path, 0, cells.length);
+    }
+
+    /**
+     * Simple binary search for a given cell (in the cells array).
+     *
+     * The return value has the exact same meaning that the one of Collections.binarySearch() but
+     * we don't use the later because we're searching for a 'CellPath' in an array of 'Cell'.
+     */
+    private int binarySearch(CellPath path, int fromIndex, int toIndex)
+    {
+        int low = fromIndex;
+        int mid = toIndex;
+        int high = mid - 1;
+        int result = -1;
+        while (low <= high)
+        {
+            mid = (low + high) >> 1;
+            if ((result = column.cellPathComparator().compare(path, cells[mid].path())) > 0)
+                low = mid + 1;
+            else if (result == 0)
+                return mid;
+            else
+                high = mid - 1;
+        }
+        return -mid - (result < 0 ? 1 : 2);
+    }
+
+    @Override
+    public boolean equals(Object other)
+    {
+        if (this == other)
+            return true;
+
+        if(!(other instanceof ComplexColumnData))
+            return false;
+
+        ComplexColumnData that = (ComplexColumnData)other;
+        return this.column().equals(that.column())
+            && this.complexDeletion().equals(that.complexDeletion)
+            && Arrays.equals(this.cells, that.cells);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(column(), complexDeletion(), cells);
+    }
+
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
+    public static class Builder
+    {
+        private ColumnDefinition column;
+        private DeletionTime complexDeletion;
+        public final List<Cell> cells = new ArrayList<>();
+
+        public void newColumn(ColumnDefinition column)
+        {
+            this.column = column;
+            this.complexDeletion = DeletionTime.LIVE; // default if writeComplexDeletion is not called
+            this.cells.clear();
+        }
+
+        public void addComplexDeletion(DeletionTime complexDeletion)
+        {
+            this.complexDeletion = complexDeletion;
+        }
+
+        public void addCell(Cell cell)
+        {
+            assert cell.column().equals(column);
+            assert cells.isEmpty() || cell.column().cellPathComparator().compare(cells.get(cells.size() - 1).path(), cell.path()) < 0;
+            cells.add(cell);
+        }
+
+        public ComplexColumnData build()
+        {
+            if (complexDeletion.isLive() && cells.isEmpty())
+                return null;
+
+            return new ComplexColumnData(column, cells.toArray(new Cell[cells.size()]), complexDeletion);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/ComplexRowDataBlock.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ComplexRowDataBlock.java b/src/java/org/apache/cassandra/db/rows/ComplexRowDataBlock.java
deleted file mode 100644
index 75df874..0000000
--- a/src/java/org/apache/cassandra/db/rows/ComplexRowDataBlock.java
+++ /dev/null
@@ -1,796 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.google.common.collect.UnmodifiableIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.utils.ObjectSizes;
-
-/**
- * Holds cells data and complex deletions for the complex columns of one or more rows.
- * <p>
- * Contrarily to {@code SimpleRowDataBlock}, each complex column can have multiple cells and
- * we thus can't use a similar dense encoding. Instead, we still store the actual cell data
- * in a {@code CellData} object, but we add a level of indirection (the cellIdx array in
- * {@link ComplexCellBlock}) which for every column of every row stores 2 indexes: the index
- * in the {@code CellData} where the first cell for this column is, and the the index of the
- * last cell (or rather, the index to the first cell that does not belong to that column).
- * <p>
- * What makes this a little bit more complicated however is that in some cases (for
- * {@link PartitionUpdate} typically), we need to be able to swap rows inside a
- * {@code ComplexRowDataBlock} and the extra level of indirection makes that more complex.
- * So in practice, we have 2 separate sub-implementation of a {@code ComplexRowDataBlock}:
- *   - The first one, {@code SimpleComplexRowDataBlock} does not support swapping rows
- *     (and is thus only used when we don't need to) but it uses a single {@code CellData}
- *     for all the rows stored.
- *   - The second one, {@code SortableComplexRowDataBlock}, uses one separate {@code CellData}
- *     per row (in fact, a {@code ComplexCellBlock} which groups the cell data with the
- *     indexing array discussed above) and simply keeps those per-row block in a list. It
- *     is thus less compact in memory but make the swapping of rows trivial.
- */
-public abstract class ComplexRowDataBlock
-{
-    private static final Logger logger = LoggerFactory.getLogger(ComplexRowDataBlock.class);
-
-    private final Columns columns;
-
-    // For each complex column, it's deletion time (if any): the nth complex column of row i
-    // will have it's deletion time at complexDelTimes[(i * ccs) + n] where ccs it the number
-    // of complex columns in 'columns'.
-    final DeletionTimeArray complexDelTimes;
-
-    protected ComplexRowDataBlock(Columns columns, int rows)
-    {
-        this.columns = columns;
-
-        int columnCount = rows * columns.complexColumnCount();
-        this.complexDelTimes = new DeletionTimeArray(columnCount);
-    }
-
-    public static ComplexRowDataBlock create(Columns columns, int rows, boolean sortable, boolean isCounter)
-    {
-        return sortable
-             ? new SortableComplexRowDataBlock(columns, rows, isCounter)
-             : new SimpleComplexRowDataBlock(columns, rows, isCounter);
-    }
-
-    public Columns columns()
-    {
-        return columns;
-    }
-
-    public CellData cellData(int row)
-    {
-        return cellBlock(row).data;
-    }
-
-    public int cellIdx(int row, ColumnDefinition c, CellPath path)
-    {
-        ComplexCellBlock block = cellBlock(row);
-        if (block == null)
-            return -1;
-
-        int base = cellBlockBase(row);
-        int i = base + 2 * columns.complexIdx(c, 0);
-
-        int start = block.cellIdx[i];
-        int end = block.cellIdx[i+1];
-
-        if (i >= block.cellIdx.length || end <= start)
-            return -1;
-
-        return Arrays.binarySearch(block.complexPaths, start, end, path, c.cellPathComparator());
-    }
-
-    // The following methods abstract the fact that we have 2 sub-implementations: both
-    // implementation will use a ComplexCellBlock to store a row, but one will use one
-    // ComplexCellBlock per row, while the other will store all rows into the same block.
-
-    // Returns the cell block for a given row. Can return null if the asked row has no data.
-    protected abstract ComplexCellBlock cellBlock(int row);
-    // Same as cellBlock(), but create the proper block if the row doesn't exists and return it.
-    protected abstract ComplexCellBlock cellBlockForWritting(int row);
-    // The index in the block returned by cellBlock()/cellBlockFroWriting() where the row starts.
-    protected abstract int cellBlockBase(int row);
-
-    protected abstract void swapCells(int i, int j);
-    protected abstract void mergeCells(int i, int j, int nowInSec);
-    protected abstract void moveCells(int i, int j);
-
-    protected abstract long cellDataUnsharedHeapSizeExcludingData();
-    protected abstract int dataCellSize();
-    protected abstract void clearCellData();
-
-    // Swap row i and j
-    public void swap(int i, int j)
-    {
-        swapCells(i, j);
-
-        int s = columns.complexColumnCount();
-        for (int k = 0; k < s; k++)
-            complexDelTimes.swap(i * s + k, j * s + k);
-    }
-
-    // Merge row i into j
-    public void merge(int i, int j, int nowInSec)
-    {
-        assert i > j;
-
-        mergeCells(i, j, nowInSec);
-
-        int s = columns.complexColumnCount();
-        if (i * s >= complexDelTimes.size())
-            return;
-
-        for (int k = 0; k < s; k++)
-            if (complexDelTimes.supersedes(i * s + k, j * s + k))
-                complexDelTimes.move(i * s + k, j * s + k);
-    }
-
-    // Move row i into j
-    public void move(int i, int j)
-    {
-        moveCells(i, j);
-        ensureDelTimesCapacity(Math.max(i, j));
-        int s = columns.complexColumnCount();
-        for (int k = 0; k < s; k++)
-            complexDelTimes.move(i * s + k, j * s + k);
-    }
-
-    public long unsharedHeapSizeExcludingData()
-    {
-        return cellDataUnsharedHeapSizeExcludingData() + complexDelTimes.unsharedHeapSize();
-    }
-
-    public int dataSize()
-    {
-        return dataCellSize() + complexDelTimes.dataSize();
-    }
-
-    public CellWriter cellWriter(boolean inOrderCells)
-    {
-        return new CellWriter(inOrderCells);
-    }
-
-    public int complexDeletionIdx(int row, ColumnDefinition column)
-    {
-        int baseIdx = columns.complexIdx(column, 0);
-        if (baseIdx < 0)
-            return -1;
-
-        int idx = (row * columns.complexColumnCount()) + baseIdx;
-        return idx < complexDelTimes.size() ? idx : -1;
-    }
-
-    public boolean hasComplexDeletion(int row)
-    {
-        int base = row * columns.complexColumnCount();
-        for (int i = base; i < base + columns.complexColumnCount(); i++)
-            if (!complexDelTimes.isLive(i))
-                return true;
-        return false;
-    }
-
-    public ByteBuffer getValue(int row, ColumnDefinition column, CellPath path)
-    {
-        CellData data = cellData(row);
-        assert data != null;
-        int idx = cellIdx(row, column, path);
-        return data.value(idx);
-    }
-
-    public void setValue(int row, ColumnDefinition column, CellPath path, ByteBuffer value)
-    {
-        CellData data = cellData(row);
-        assert data != null;
-        int idx = cellIdx(row, column, path);
-        data.setValue(idx, value);
-    }
-
-    public static ReusableIterator reusableComplexCells()
-    {
-        return new ReusableIterator();
-    }
-
-    public static DeletionTimeArray.Cursor complexDeletionCursor()
-    {
-        return new DeletionTimeArray.Cursor();
-    }
-
-    public static ReusableIterator reusableIterator()
-    {
-        return new ReusableIterator();
-    }
-
-    public void clear()
-    {
-        clearCellData();
-        complexDelTimes.clear();
-    }
-
-    private void ensureDelTimesCapacity(int rowToSet)
-    {
-        int originalCapacity = complexDelTimes.size() / columns.complexColumnCount();
-        if (rowToSet < originalCapacity)
-            return;
-
-        int newCapacity = RowDataBlock.computeNewCapacity(originalCapacity, rowToSet);
-        complexDelTimes.resize(newCapacity * columns.complexColumnCount());
-    }
-
-    /**
-     * Simple sub-implementation that doesn't support swapping/sorting rows.
-     * The cell data for every row is stored in the same contiguous {@code ComplexCellBloc}
-     * object.
-     */
-    private static class SimpleComplexRowDataBlock extends ComplexRowDataBlock
-    {
-        private static final long EMPTY_SIZE = ObjectSizes.measure(new SimpleComplexRowDataBlock(Columns.NONE, 0, false));
-
-        private final ComplexCellBlock cells;
-
-        private SimpleComplexRowDataBlock(Columns columns, int rows, boolean isCounter)
-        {
-            super(columns, rows);
-            this.cells = new ComplexCellBlock(columns, rows, isCounter);
-        }
-
-        protected ComplexCellBlock cellBlock(int row)
-        {
-            return cells;
-        }
-
-        protected ComplexCellBlock cellBlockForWritting(int row)
-        {
-            cells.ensureCapacity(row);
-            return cells;
-        }
-
-        protected int cellBlockBase(int row)
-        {
-            return 2 * row * columns().complexColumnCount();
-        }
-
-        // Swap cells from row i and j
-        public void swapCells(int i, int j)
-        {
-            throw new UnsupportedOperationException();
-        }
-
-        // Merge cells from row i into j
-        public void mergeCells(int i, int j, int nowInSec)
-        {
-            throw new UnsupportedOperationException();
-        }
-
-        // Move cells from row i into j
-        public void moveCells(int i, int j)
-        {
-            throw new UnsupportedOperationException();
-        }
-
-        protected long cellDataUnsharedHeapSizeExcludingData()
-        {
-            return EMPTY_SIZE + cells.unsharedHeapSizeExcludingData();
-        }
-
-        protected int dataCellSize()
-        {
-            return cells.dataSize();
-        }
-
-        protected void clearCellData()
-        {
-            cells.clear();
-        }
-    }
-
-    /**
-     * Sub-implementation that support swapping/sorting rows.
-     * The data for each row is stored in a different {@code ComplexCellBlock} object,
-     * making swapping rows easy.
-     */
-    private static class SortableComplexRowDataBlock extends ComplexRowDataBlock
-    {
-        private static final long EMPTY_SIZE = ObjectSizes.measure(new SortableComplexRowDataBlock(Columns.NONE, 0, false));
-
-        // The cell data for each row.
-        private final List<ComplexCellBlock> cells;
-        private final boolean isCounter;
-
-        private SortableComplexRowDataBlock(Columns columns, int rows, boolean isCounter)
-        {
-            super(columns, rows);
-            this.cells = new ArrayList<>(rows);
-            this.isCounter = isCounter;
-        }
-
-        protected ComplexCellBlock cellBlockForWritting(int row)
-        {
-            if (row < cells.size())
-                return cells.get(row);
-
-            // Make sure the list of size 'row-1' before the insertion, adding nulls if necessary,
-            // so that we do are writing row 'row'
-            ensureCapacity(row-1);
-
-            assert row == cells.size();
-            ComplexCellBlock block = new ComplexCellBlock(columns(), 1, isCounter);
-            cells.add(block);
-            return block;
-        }
-
-        private void ensureCapacity(int row)
-        {
-            while (row >= cells.size())
-                cells.add(null);
-        }
-
-        protected ComplexCellBlock cellBlock(int row)
-        {
-            return row >= cells.size() ? null : cells.get(row);
-        }
-
-        protected int cellBlockBase(int row)
-        {
-            return 0;
-        }
-
-        // Swap row i and j
-        protected void swapCells(int i, int j)
-        {
-            int max = Math.max(i, j);
-            if (max >= cells.size())
-                ensureCapacity(max);
-
-            ComplexCellBlock block = cells.get(j);
-            move(i, j);
-            cells.set(i, block);
-        }
-
-        // Merge row i into j
-        protected void mergeCells(int i, int j, int nowInSec)
-        {
-            assert i > j;
-            if (i >= cells.size())
-                return;
-
-            ComplexCellBlock b1 = cells.get(i);
-            if (b1 == null)
-                return; // nothing to merge into j
-
-            ComplexCellBlock b2 = cells.get(j);
-            if (b2 == null)
-            {
-                cells.set(j, b1);
-                return;
-            }
-
-            ComplexCellBlock merged = new ComplexCellBlock(columns(), 1, isCounter);
-
-            int idxMerged = 0;
-            int s = columns().complexColumnCount();
-            for (int k = 0; k < s; k++)
-            {
-                ColumnDefinition column = columns().getComplex(k);
-                Comparator<CellPath> comparator = column.cellPathComparator();
-
-                merged.cellIdx[2 * k] = idxMerged;
-
-                int idx1 = b1.cellIdx[2 * k];
-                int end1 = b1.cellIdx[2 * k + 1];
-                int idx2 = b2.cellIdx[2 * k];
-                int end2 = b2.cellIdx[2 * k + 1];
-
-                while (idx1 < end1 || idx2 < end2)
-                {
-                    int cmp = idx1 >= end1 ? 1
-                            : (idx2 >= end2 ? -1
-                            : comparator.compare(b1.complexPaths[idx1], b2.complexPaths[idx2]));
-
-                    if (cmp == 0)
-                        merge(b1, idx1++, b2, idx2++, merged, idxMerged++, nowInSec);
-                    else if (cmp < 0)
-                        copy(b1, idx1++, merged, idxMerged++);
-                    else
-                        copy(b2, idx2++, merged, idxMerged++);
-                }
-
-                merged.cellIdx[2 * k + 1] = idxMerged;
-            }
-
-            cells.set(j, merged);
-        }
-
-        private void copy(ComplexCellBlock fromBlock, int fromIdx, ComplexCellBlock toBlock, int toIdx)
-        {
-            fromBlock.data.moveCell(fromIdx, toBlock.data, toIdx);
-            toBlock.ensureComplexPathsCapacity(toIdx);
-            toBlock.complexPaths[toIdx] = fromBlock.complexPaths[fromIdx];
-        }
-
-        private void merge(ComplexCellBlock b1, int idx1, ComplexCellBlock b2, int idx2, ComplexCellBlock mergedBlock, int mergedIdx, int nowInSec)
-        {
-            if (isCounter)
-                CellData.mergeCounterCell(b1.data, idx1, b2.data, idx2, mergedBlock.data, mergedIdx, nowInSec);
-            else
-                CellData.mergeRegularCell(b1.data, idx1, b2.data, idx2, mergedBlock.data, mergedIdx, nowInSec);
-            mergedBlock.ensureComplexPathsCapacity(mergedIdx);
-            mergedBlock.complexPaths[mergedIdx] = b1.complexPaths[idx1];
-        }
-
-        // Move row i into j
-        protected void moveCells(int i, int j)
-        {
-            int max = Math.max(i, j);
-            if (max >= cells.size())
-                ensureCapacity(max);
-
-            cells.set(j, cells.get(i));
-        }
-
-        protected long cellDataUnsharedHeapSizeExcludingData()
-        {
-            long size = EMPTY_SIZE;
-            for (ComplexCellBlock block : cells)
-                if (block != null)
-                    size += block.unsharedHeapSizeExcludingData();
-            return size;
-        }
-
-        protected int dataCellSize()
-        {
-            int size = 0;
-            for (ComplexCellBlock block : cells)
-                if (block != null)
-                    size += block.dataSize();
-            return size;
-        }
-
-        protected void clearCellData()
-        {
-            for (ComplexCellBlock block : cells)
-                if (block != null)
-                    block.clear();
-        }
-    }
-
-    /**
-     * Stores complex column cell data for one or more rows.
-     * <p>
-     * On top of a {@code CellData} object, this stores an index to where the cells
-     * of a given column start and stop in that {@code CellData} object (cellIdx)
-     * as well as the cell path for the cells (since {@code CellData} doesn't have those).
-     */
-    private static class ComplexCellBlock
-    {
-        private final Columns columns;
-
-        /*
-         * For a given complex column c, we have to store an unknown number of
-         * cells. So for each column of each row, we keep pointers (in data)
-         * to the start and end of the cells for this column (cells for a given
-         * columns are thus stored contiguously).
-         * For instance, if columns has 'c' complex columns, the x-th column of
-         * row 'n' will have it's cells in data at indexes
-         *    [cellIdx[2 * (n * c + x)], cellIdx[2 * (n * c + x) + 1])
-         */
-        private int[] cellIdx;
-
-        private final CellData data;
-
-        // The first free idx in data (for writing purposes).
-        private int idx;
-
-        // THe (complex) cells path. This is indexed exactly like the cells in data (so through cellIdx).
-        private CellPath[] complexPaths;
-
-        public ComplexCellBlock(Columns columns, int rows, boolean isCounter)
-        {
-            this.columns = columns;
-
-            int columnCount = columns.complexColumnCount();
-            this.cellIdx = new int[columnCount * 2 * rows];
-
-            // We start with an estimated 4 cells per complex column. The arrays
-            // will grow if needed so this is just a somewhat random estimation.
-            int cellCount =  columnCount * 4;
-            this.data = new CellData(cellCount, isCounter);
-            this.complexPaths = new CellPath[cellCount];
-        }
-
-        public void addCell(int columnIdx, ByteBuffer value, LivenessInfo info, CellPath path, boolean isFirstCell)
-        {
-            if (isFirstCell)
-                cellIdx[columnIdx] = idx;
-            cellIdx[columnIdx + 1] = idx + 1;
-
-            data.setCell(idx, value, info);
-            ensureComplexPathsCapacity(idx);
-            complexPaths[idx] = path;
-            idx++;
-        }
-
-        public long unsharedHeapSizeExcludingData()
-        {
-            long size = ObjectSizes.sizeOfArray(cellIdx)
-                      + data.unsharedHeapSizeExcludingData()
-                      + ObjectSizes.sizeOfArray(complexPaths);
-
-            for (int i = 0; i < complexPaths.length; i++)
-                if (complexPaths[i] != null)
-                    size += ((MemtableRowData.BufferCellPath)complexPaths[i]).unsharedHeapSizeExcludingData();
-            return size;
-        }
-
-        public int dataSize()
-        {
-            int size = data.dataSize() + cellIdx.length * 4;
-
-            for (int i = 0; i < complexPaths.length; i++)
-                if (complexPaths[i] != null)
-                    size += complexPaths[i].dataSize();
-
-            return size;
-        }
-
-        private void ensureCapacity(int rowToSet)
-        {
-            int columnCount = columns.complexColumnCount();
-            int originalCapacity = cellIdx.length / (2 * columnCount);
-            if (rowToSet < originalCapacity)
-                return;
-
-            int newCapacity = RowDataBlock.computeNewCapacity(originalCapacity, rowToSet);
-            cellIdx = Arrays.copyOf(cellIdx, newCapacity * 2 * columnCount);
-        }
-
-        private void ensureComplexPathsCapacity(int idxToSet)
-        {
-            int originalCapacity = complexPaths.length;
-            if (idxToSet < originalCapacity)
-                return;
-
-            int newCapacity = RowDataBlock.computeNewCapacity(originalCapacity, idxToSet);
-            complexPaths = Arrays.copyOf(complexPaths, newCapacity);
-        }
-
-        public void clear()
-        {
-            data.clear();
-            Arrays.fill(cellIdx, 0);
-            Arrays.fill(complexPaths, null);
-            idx = 0;
-        }
-    }
-
-    /**
-     * Simple sublcassing of {@code CellData.ReusableCell} to include the cell path.
-     */
-    private static class ReusableCell extends CellData.ReusableCell
-    {
-        private ComplexCellBlock cellBlock;
-
-        ReusableCell setTo(ComplexCellBlock cellBlock, ColumnDefinition column, int idx)
-        {
-            this.cellBlock = cellBlock;
-            super.setTo(cellBlock.data, column, idx);
-            return this;
-        }
-
-        @Override
-        public CellPath path()
-        {
-            return cellBlock.complexPaths[idx];
-        }
-    }
-
-    /**
-     * An iterator over the complex cells of a given row.
-     * This is used both to iterate over all the (complex) cells of the row, or only on the cells
-     * of a given column within the row.
-     */
-    static class ReusableIterator extends UnmodifiableIterator<Cell>
-    {
-        private ComplexCellBlock cellBlock;
-        private final ReusableCell cell = new ReusableCell();
-
-        // The idx in 'cellBlock' of the row we're iterating over
-        private int rowIdx;
-
-        // columnIdx is the index in 'columns' of the current column we're iterating over.
-        // 'endColumnIdx' is the value of 'columnIdx' at which we should stop iterating.
-        private int columnIdx;
-        private int endColumnIdx;
-
-        // idx is the index in 'cellBlock.data' of the current cell this iterator is on. 'endIdx'
-        // is the index in 'cellBlock.data' of the first cell that does not belong to the current
-        // column we're iterating over (the one pointed by columnIdx).
-        private int idx;
-        private int endIdx;
-
-        private ReusableIterator()
-        {
-        }
-
-        // Sets the iterator for iterating over the cells of 'column' in 'row'
-        public ReusableIterator setTo(ComplexRowDataBlock dataBlock, int row, ColumnDefinition column)
-        {
-            if (dataBlock == null)
-            {
-                this.cellBlock = null;
-                return null;
-            }
-
-            this.cellBlock = dataBlock.cellBlock(row);
-            if (cellBlock == null)
-                return null;
-
-            rowIdx = dataBlock.cellBlockBase(row);
-
-            columnIdx = dataBlock.columns.complexIdx(column, 0);
-            if (columnIdx < 0)
-                return null;
-
-            // We only want the cells of 'column', so stop as soon as we've reach the next column
-            endColumnIdx = columnIdx + 1;
-
-            resetCellIdx();
-
-            return endIdx <= idx ? null : this;
-        }
-
-        // Sets the iterator for iterating over all the cells of 'row'
-        public ReusableIterator setTo(ComplexRowDataBlock dataBlock, int row)
-        {
-            if (dataBlock == null)
-            {
-                this.cellBlock = null;
-                return null;
-            }
-
-            this.cellBlock = dataBlock.cellBlock(row);
-            if (cellBlock == null)
-                return null;
-
-            rowIdx = dataBlock.cellBlockBase(row);
-
-            // We want to iterator over all columns
-            columnIdx = 0;
-            endColumnIdx = dataBlock.columns.complexColumnCount();
-
-            // Not every column might have cells, so set thing up so we're on the
-            // column having cells (with idx and endIdx sets properly for that column)
-            findNextColumnWithCells();
-            return columnIdx < endColumnIdx ? null : this;
-        }
-
-        private void findNextColumnWithCells()
-        {
-            while (columnIdx < endColumnIdx)
-            {
-                resetCellIdx();
-                if (idx < endIdx)
-                    return;
-                ++columnIdx;
-            }
-        }
-
-        // Provided that columnIdx and rowIdx are properly set, sets idx to the first
-        // cells of the pointed column, and endIdx to the first cell not for said column
-        private void resetCellIdx()
-        {
-            int i = rowIdx + 2 * columnIdx;
-            if (i >= cellBlock.cellIdx.length)
-            {
-                idx = 0;
-                endIdx = 0;
-            }
-            else
-            {
-                idx = cellBlock.cellIdx[i];
-                endIdx = cellBlock.cellIdx[i + 1];
-            }
-        }
-
-        public boolean hasNext()
-        {
-            if (cellBlock == null)
-                return false;
-
-            if (columnIdx >= endColumnIdx)
-                return false;
-
-            // checks if we have more cells for the current column
-            if (idx < endIdx)
-                return true;
-
-            // otherwise, find the next column that has cells.
-            ++columnIdx;
-            findNextColumnWithCells();
-
-            return columnIdx < endColumnIdx;
-        }
-
-        public Cell next()
-        {
-            return cell.setTo(cellBlock, cellBlock.columns.getComplex(columnIdx), idx++);
-        }
-    }
-
-    public class CellWriter
-    {
-        private final boolean inOrderCells;
-
-        private int base;
-        private int row;
-        private int lastColumnIdx;
-
-        public CellWriter(boolean inOrderCells)
-        {
-            this.inOrderCells = inOrderCells;
-        }
-
-        public void addCell(ColumnDefinition column, ByteBuffer value, LivenessInfo info, CellPath path)
-        {
-            assert path != null;
-
-            ComplexCellBlock cellBlock = cellBlockForWritting(row);
-
-            lastColumnIdx = columns.complexIdx(column, inOrderCells ? lastColumnIdx : 0);
-            assert lastColumnIdx >= 0 : "Cannot find column " + column.name + " in " + columns;
-
-            int idx = cellBlockBase(row) + 2 * lastColumnIdx;
-
-            int start = cellBlock.cellIdx[idx];
-            int end = cellBlock.cellIdx[idx + 1];
-
-            cellBlock.addCell(idx, value, info, path, end <= start);
-        }
-
-        public void setComplexDeletion(ColumnDefinition column, DeletionTime deletionTime)
-        {
-            int columnIdx = base + columns.complexIdx(column, 0);
-            ensureDelTimesCapacity(row);
-            complexDelTimes.set(columnIdx, deletionTime);
-        }
-
-        public void endOfRow()
-        {
-            base += columns.complexColumnCount();
-            lastColumnIdx = 0;
-            ++row;
-        }
-
-        public void reset()
-        {
-            base = 0;
-            row = 0;
-            lastColumnIdx = 0;
-            clearCellData();
-            complexDelTimes.clear();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/FilteringRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/FilteringRow.java b/src/java/org/apache/cassandra/db/rows/FilteringRow.java
deleted file mode 100644
index fb8f448..0000000
--- a/src/java/org/apache/cassandra/db/rows/FilteringRow.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import java.util.Iterator;
-
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ColumnFilter;
-
-public abstract class FilteringRow extends WrappingRow
-{
-    public static FilteringRow columnsFilteringRow(final Columns toInclude)
-    {
-        return new FilteringRow()
-        {
-            @Override
-            protected boolean include(ColumnDefinition column)
-            {
-                return toInclude.contains(column);
-            }
-        };
-    }
-
-    public static FilteringRow columnsFilteringRow(final ColumnFilter toInclude)
-    {
-        return new FilteringRow()
-        {
-            @Override
-            protected boolean include(ColumnDefinition column)
-            {
-                return toInclude.includes(column);
-            }
-
-            @Override
-            protected boolean include(Cell cell)
-            {
-                return toInclude.includes(cell);
-            }
-        };
-    }
-
-    public FilteringRow setTo(Row row)
-    {
-        super.setTo(row);
-        return this;
-    }
-
-    /**
-     * The following functions are meant to be overriden based on needs.
-     */
-    protected boolean include(Cell cell) { return true; }
-    protected boolean include(LivenessInfo info) { return true; }
-    protected boolean include(DeletionTime dt) { return true; }
-    protected boolean include(ColumnDefinition column) { return true; }
-    protected boolean include(ColumnDefinition c, DeletionTime dt) { return true; }
-
-    // Sublcasses that override this should be careful to call the overriden version first, or this might break FilteringRow (i.e. it might not
-    // filter what it should).
-    @Override
-    protected Cell filterCell(Cell cell)
-    {
-        return include(cell.column()) && include(cell.livenessInfo()) && include(cell) ? cell : null;
-    }
-
-    protected DeletionTime filterDeletionTime(DeletionTime deletion)
-    {
-        return deletion == null || !include(deletion)
-             ? DeletionTime.LIVE
-             : deletion;
-    }
-
-    @Override
-    public LivenessInfo primaryKeyLivenessInfo()
-    {
-        LivenessInfo info = super.primaryKeyLivenessInfo();
-        return include(info) ? info : LivenessInfo.NONE;
-    }
-
-    @Override
-    public DeletionTime deletion()
-    {
-        DeletionTime deletion = super.deletion();
-        return include(deletion) ? deletion : DeletionTime.LIVE;
-    }
-
-    @Override
-    public Iterator<Cell> getCells(ColumnDefinition c)
-    {
-        // slightly speed things up if we know we don't care at all about the column
-        if (!include(c))
-            return null;
-
-        return super.getCells(c);
-    }
-
-    @Override
-    public DeletionTime getDeletion(ColumnDefinition c)
-    {
-        if (!include(c))
-            return DeletionTime.LIVE;
-
-        DeletionTime dt = super.getDeletion(c);
-        return include(c, dt) ? dt : DeletionTime.LIVE;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/FilteringRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/FilteringRowIterator.java b/src/java/org/apache/cassandra/db/rows/FilteringRowIterator.java
deleted file mode 100644
index fd1c0a1..0000000
--- a/src/java/org/apache/cassandra/db/rows/FilteringRowIterator.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import org.apache.cassandra.db.*;
-
-public class FilteringRowIterator extends WrappingUnfilteredRowIterator
-{
-    private final FilteringRow filter;
-    private Unfiltered next;
-
-    public FilteringRowIterator(UnfilteredRowIterator toFilter)
-    {
-        super(toFilter);
-        this.filter = makeRowFilter();
-    }
-
-    // Subclasses that want to filter withing row should overwrite this. Note that since FilteringRow
-    // is a reusable object, this method won't be called for every filtered row and the same filter will
-    // be used for every regular rows. However, this still can be called twice if we have a static row
-    // to filter, because we don't want to use the same object for them as this makes for weird behavior
-    // if calls to staticRow() are interleaved with hasNext().
-    protected FilteringRow makeRowFilter()
-    {
-        return null;
-    }
-
-    protected boolean includeRangeTombstoneMarker(RangeTombstoneMarker marker)
-    {
-        return true;
-    }
-
-    // Allows to modify the range tombstone returned. This is called *after* includeRangeTombstoneMarker has been called.
-    protected RangeTombstoneMarker filterRangeTombstoneMarker(RangeTombstoneMarker marker, boolean reversed)
-    {
-        return marker;
-    }
-
-    protected boolean includeRow(Row row)
-    {
-        return true;
-    }
-
-    protected boolean includePartitionDeletion(DeletionTime dt)
-    {
-        return true;
-    }
-
-    @Override
-    public DeletionTime partitionLevelDeletion()
-    {
-        DeletionTime dt = wrapped.partitionLevelDeletion();
-        return includePartitionDeletion(dt) ? dt : DeletionTime.LIVE;
-    }
-
-    @Override
-    public Row staticRow()
-    {
-        Row row = super.staticRow();
-        if (row == Rows.EMPTY_STATIC_ROW)
-            return row;
-
-        FilteringRow filter = makeRowFilter();
-        if (filter != null)
-            row = filter.setTo(row);
-
-        return !row.isEmpty() && includeRow(row) ? row : Rows.EMPTY_STATIC_ROW;
-    }
-
-    @Override
-    public boolean hasNext()
-    {
-        if (next != null)
-            return true;
-
-        while (super.hasNext())
-        {
-            Unfiltered unfiltered = super.next();
-            if (unfiltered.kind() == Unfiltered.Kind.ROW)
-            {
-                Row row = filter == null ? (Row) unfiltered : filter.setTo((Row) unfiltered);
-                if (!row.isEmpty() && includeRow(row))
-                {
-                    next = row;
-                    return true;
-                }
-            }
-            else
-            {
-                RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered;
-                if (includeRangeTombstoneMarker(marker))
-                {
-                    next = filterRangeTombstoneMarker(marker, isReverseOrder());
-                    return true;
-                }
-            }
-        }
-        return false;
-    }
-
-    @Override
-    public Unfiltered next()
-    {
-        if (next == null)
-            hasNext();
-
-        Unfiltered toReturn = next;
-        next = null;
-        return toReturn;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/MemtableRowData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/MemtableRowData.java b/src/java/org/apache/cassandra/db/rows/MemtableRowData.java
deleted file mode 100644
index cad0765..0000000
--- a/src/java/org/apache/cassandra/db/rows/MemtableRowData.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-
-/**
- * Row data stored inside a memtable.
- *
- * This has methods like dataSize and unsharedHeapSizeExcludingData that are
- * specific to memtables.
- */
-public interface MemtableRowData extends Clusterable
-{
-    public Columns columns();
-
-    public int dataSize();
-
-    // returns the size of the Row and all references on the heap, excluding any costs associated with byte arrays
-    // that would be allocated by a clone operation, as these will be accounted for by the allocator
-    public long unsharedHeapSizeExcludingData();
-
-    public interface ReusableRow extends Row
-    {
-        public ReusableRow setTo(MemtableRowData rowData);
-    }
-
-    public class BufferRowData implements MemtableRowData
-    {
-        private static final long EMPTY_SIZE = ObjectSizes.measure(new BufferRowData(null, LivenessInfo.NONE, DeletionTime.LIVE, null));
-
-        private final Clustering clustering;
-        private final LivenessInfo livenessInfo;
-        private final DeletionTime deletion;
-        private final RowDataBlock dataBlock;
-
-        public BufferRowData(Clustering clustering, LivenessInfo livenessInfo, DeletionTime deletion, RowDataBlock dataBlock)
-        {
-            this.clustering = clustering;
-            this.livenessInfo = livenessInfo.takeAlias();
-            this.deletion = deletion.takeAlias();
-            this.dataBlock = dataBlock;
-        }
-
-        public Clustering clustering()
-        {
-            return clustering;
-        }
-
-        public Columns columns()
-        {
-            return dataBlock.columns();
-        }
-
-        public int dataSize()
-        {
-            return clustering.dataSize() + livenessInfo.dataSize() + deletion.dataSize() + dataBlock.dataSize();
-        }
-
-        public long unsharedHeapSizeExcludingData()
-        {
-            return EMPTY_SIZE
-                 + (clustering == Clustering.STATIC_CLUSTERING ? 0 : ((BufferClustering)clustering).unsharedHeapSizeExcludingData())
-                 + dataBlock.unsharedHeapSizeExcludingData();
-        }
-
-        public static ReusableRow createReusableRow()
-        {
-            return new BufferRow();
-        }
-
-        private static class BufferRow extends AbstractReusableRow implements ReusableRow
-        {
-            private BufferRowData rowData;
-
-            private BufferRow()
-            {
-            }
-
-            public ReusableRow setTo(MemtableRowData rowData)
-            {
-                assert rowData instanceof BufferRowData;
-                this.rowData = (BufferRowData)rowData;
-                return this;
-            }
-
-            protected RowDataBlock data()
-            {
-                return rowData.dataBlock;
-            }
-
-            protected int row()
-            {
-                return 0;
-            }
-
-            public Clustering clustering()
-            {
-                return rowData.clustering;
-            }
-
-            public LivenessInfo primaryKeyLivenessInfo()
-            {
-                return rowData.livenessInfo;
-            }
-
-            public DeletionTime deletion()
-            {
-                return rowData.deletion;
-            }
-        }
-    }
-
-    public class BufferClustering extends Clustering
-    {
-        private static final long EMPTY_SIZE = ObjectSizes.measure(new BufferClustering(0));
-
-        private final ByteBuffer[] values;
-
-        public BufferClustering(int size)
-        {
-            this.values = new ByteBuffer[size];
-        }
-
-        public void setClusteringValue(int i, ByteBuffer value)
-        {
-            values[i] = value;
-        }
-
-        public int size()
-        {
-            return values.length;
-        }
-
-        public ByteBuffer get(int i)
-        {
-            return values[i];
-        }
-
-        public ByteBuffer[] getRawValues()
-        {
-            return values;
-        }
-
-        public long unsharedHeapSizeExcludingData()
-        {
-            return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(values);
-        }
-
-        @Override
-        public long unsharedHeapSize()
-        {
-            return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(values);
-        }
-
-        public Clustering takeAlias()
-        {
-            return this;
-        }
-    }
-
-    public class BufferCellPath extends CellPath.SimpleCellPath
-    {
-        private static final long EMPTY_SIZE = ObjectSizes.measure(new BufferCellPath(new ByteBuffer[0]));
-
-        private BufferCellPath(ByteBuffer[] values)
-        {
-            super(values);
-        }
-
-        public static BufferCellPath clone(CellPath path, AbstractAllocator allocator)
-        {
-            int size = path.size();
-            ByteBuffer[] values = new ByteBuffer[size];
-            for (int i = 0; i < size; i++)
-                values[i] = allocator.clone(path.get(0));
-            return new BufferCellPath(values);
-        }
-
-        public long unsharedHeapSizeExcludingData()
-        {
-            return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(values);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
index 8b52b0b..b35033d 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
@@ -23,6 +23,7 @@ import java.util.Objects;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
 
 /**
  * A range tombstone marker that indicates the bound of a range tombstone (start or end).
@@ -34,7 +35,7 @@ public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker
     public RangeTombstoneBoundMarker(RangeTombstone.Bound bound, DeletionTime deletion)
     {
         super(bound);
-        assert bound.kind().isBound();
+        assert !bound.isBoundary();
         this.deletion = deletion;
     }
 
@@ -43,16 +44,6 @@ public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker
         this(new RangeTombstone.Bound(bound.kind(), bound.getRawValues()), deletion);
     }
 
-    public static RangeTombstoneBoundMarker inclusiveStart(ClusteringPrefix clustering, DeletionTime deletion)
-    {
-        return new RangeTombstoneBoundMarker(new RangeTombstone.Bound(RangeTombstone.Bound.Kind.INCL_START_BOUND, clustering.getRawValues()), deletion);
-    }
-
-    public static RangeTombstoneBoundMarker inclusiveEnd(ClusteringPrefix clustering, DeletionTime deletion)
-    {
-        return new RangeTombstoneBoundMarker(new RangeTombstone.Bound(RangeTombstone.Bound.Kind.INCL_END_BOUND, clustering.getRawValues()), deletion);
-    }
-
     public static RangeTombstoneBoundMarker inclusiveOpen(boolean reversed, ByteBuffer[] boundValues, DeletionTime deletion)
     {
         RangeTombstone.Bound bound = RangeTombstone.Bound.inclusiveOpen(reversed, boundValues);
@@ -90,16 +81,6 @@ public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker
         return deletion;
     }
 
-    public boolean isOpen(boolean reversed)
-    {
-        return bound.kind().isOpen(reversed);
-    }
-
-    public boolean isClose(boolean reversed)
-    {
-        return bound.kind().isClose(reversed);
-    }
-
     public DeletionTime openDeletionTime(boolean reversed)
     {
         if (!isOpen(reversed))
@@ -128,11 +109,19 @@ public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker
         return bound.isInclusive();
     }
 
-    public void copyTo(RangeTombstoneMarker.Writer writer)
+    public RangeTombstone.Bound openBound(boolean reversed)
+    {
+        return isOpen(reversed) ? clustering() : null;
+    }
+
+    public RangeTombstone.Bound closeBound(boolean reversed)
+    {
+        return isClose(reversed) ? clustering() : null;
+    }
+
+    public RangeTombstoneBoundMarker copy(AbstractAllocator allocator)
     {
-        copyBoundTo(writer);
-        writer.writeBoundDeletion(deletion);
-        writer.endOfMarker();
+        return new RangeTombstoneBoundMarker(clustering().copy(allocator), deletion);
     }
 
     public void digest(MessageDigest digest)
@@ -143,11 +132,7 @@ public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker
 
     public String toString(CFMetaData metadata)
     {
-        StringBuilder sb = new StringBuilder();
-        sb.append("Marker ");
-        sb.append(bound.toString(metadata));
-        sb.append("@").append(deletion.markedForDeleteAt());
-        return sb.toString();
+        return "Marker " + bound.toString(metadata) + '@' + deletion.markedForDeleteAt();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
index f17515d..06fbf87 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
@@ -23,6 +23,7 @@ import java.util.Objects;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
 
 /**
  * A range tombstone marker that represents a boundary between 2 range tombstones (i.e. it closes one range and open another).
@@ -35,7 +36,7 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker
     public RangeTombstoneBoundaryMarker(RangeTombstone.Bound bound, DeletionTime endDeletion, DeletionTime startDeletion)
     {
         super(bound);
-        assert bound.kind().isBoundary();
+        assert bound.isBoundary();
         this.endDeletion = endDeletion;
         this.startDeletion = startDeletion;
     }
@@ -56,11 +57,6 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker
         return new RangeTombstoneBoundaryMarker(bound, endDeletion, startDeletion);
     }
 
-    public boolean isBoundary()
-    {
-        return true;
-    }
-
     /**
      * The deletion time for the range tombstone this boundary ends (in clustering order).
      */
@@ -92,6 +88,16 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker
         return (bound.kind() == ClusteringPrefix.Kind.EXCL_END_INCL_START_BOUNDARY) ^ reversed;
     }
 
+    public RangeTombstone.Bound openBound(boolean reversed)
+    {
+        return bound.withNewKind(bound.kind().openBoundOfBoundary(reversed));
+    }
+
+    public RangeTombstone.Bound closeBound(boolean reversed)
+    {
+        return bound.withNewKind(bound.kind().closeBoundOfBoundary(reversed));
+    }
+
     public boolean closeIsInclusive(boolean reversed)
     {
         return (bound.kind() == ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY) ^ reversed;
@@ -109,6 +115,11 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker
         return true;
     }
 
+    public RangeTombstoneBoundaryMarker copy(AbstractAllocator allocator)
+    {
+        return new RangeTombstoneBoundaryMarker(clustering().copy(allocator), endDeletion, startDeletion);
+    }
+
     public static RangeTombstoneBoundaryMarker makeBoundary(boolean reversed, Slice.Bound close, Slice.Bound open, DeletionTime closeDeletion, DeletionTime openDeletion)
     {
         assert RangeTombstone.Bound.Kind.compare(close.kind(), open.kind()) == 0 : "Both bound don't form a boundary";
@@ -118,21 +129,14 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker
              : inclusiveCloseExclusiveOpen(reversed, close.getRawValues(), closeDeletion, openDeletion);
     }
 
-    public RangeTombstoneBoundMarker createCorrespondingCloseBound(boolean reversed)
-    {
-        return new RangeTombstoneBoundMarker(bound.withNewKind(bound.kind().closeBoundOfBoundary(reversed)), endDeletion);
-    }
-
-    public RangeTombstoneBoundMarker createCorrespondingOpenBound(boolean reversed)
+    public RangeTombstoneBoundMarker createCorrespondingCloseMarker(boolean reversed)
     {
-        return new RangeTombstoneBoundMarker(bound.withNewKind(bound.kind().openBoundOfBoundary(reversed)), startDeletion);
+        return new RangeTombstoneBoundMarker(closeBound(reversed), endDeletion);
     }
 
-    public void copyTo(RangeTombstoneMarker.Writer writer)
+    public RangeTombstoneBoundMarker createCorrespondingOpenMarker(boolean reversed)
     {
-        copyBoundTo(writer);
-        writer.writeBoundaryDeletion(endDeletion, startDeletion);
-        writer.endOfMarker();
+        return new RangeTombstoneBoundMarker(openBound(reversed), startDeletion);
     }
 
     public void digest(MessageDigest digest)
@@ -144,11 +148,7 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker
 
     public String toString(CFMetaData metadata)
     {
-        StringBuilder sb = new StringBuilder();
-        sb.append("Marker ");
-        sb.append(bound.toString(metadata));
-        sb.append("@").append(endDeletion.markedForDeleteAt()).append("-").append(startDeletion.markedForDeleteAt());
-        return sb.toString();
+        return String.format("Marker %s@%d-%d", bound.toString(metadata), endDeletion.markedForDeleteAt(), startDeletion.markedForDeleteAt());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
index 380e6b0..5771a86 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java
@@ -22,6 +22,7 @@ import java.util.*;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
 
 /**
  * A marker for a range tombstone bound.
@@ -35,78 +36,18 @@ public interface RangeTombstoneMarker extends Unfiltered
 
     public boolean isBoundary();
 
-    public void copyTo(RangeTombstoneMarker.Writer writer);
-
     public boolean isOpen(boolean reversed);
     public boolean isClose(boolean reversed);
+
     public DeletionTime openDeletionTime(boolean reversed);
     public DeletionTime closeDeletionTime(boolean reversed);
     public boolean openIsInclusive(boolean reversed);
     public boolean closeIsInclusive(boolean reversed);
 
-    public interface Writer extends Slice.Bound.Writer
-    {
-        public void writeBoundDeletion(DeletionTime deletion);
-        public void writeBoundaryDeletion(DeletionTime endDeletion, DeletionTime startDeletion);
-        public void endOfMarker();
-    }
-
-    public static class Builder implements Writer
-    {
-        private final ByteBuffer[] values;
-        private int size;
-
-        private RangeTombstone.Bound.Kind kind;
-        private DeletionTime firstDeletion;
-        private DeletionTime secondDeletion;
-
-        public Builder(int maxClusteringSize)
-        {
-            this.values = new ByteBuffer[maxClusteringSize];
-        }
-
-        public void writeClusteringValue(ByteBuffer value)
-        {
-            values[size++] = value;
-        }
-
-        public void writeBoundKind(RangeTombstone.Bound.Kind kind)
-        {
-            this.kind = kind;
-        }
-
-        public void writeBoundDeletion(DeletionTime deletion)
-        {
-            firstDeletion = deletion;
-        }
-
-        public void writeBoundaryDeletion(DeletionTime endDeletion, DeletionTime startDeletion)
-        {
-            firstDeletion = endDeletion;
-            secondDeletion = startDeletion;
-        }
-
-        public void endOfMarker()
-        {
-        }
-
-        public RangeTombstoneMarker build()
-        {
-            assert kind != null : "Nothing has been written";
-            if (kind.isBoundary())
-                return new RangeTombstoneBoundaryMarker(new RangeTombstone.Bound(kind, Arrays.copyOfRange(values, 0, size)), firstDeletion, secondDeletion);
-            else
-                return new RangeTombstoneBoundMarker(new RangeTombstone.Bound(kind, Arrays.copyOfRange(values, 0, size)), firstDeletion);
-        }
+    public RangeTombstone.Bound openBound(boolean reversed);
+    public RangeTombstone.Bound closeBound(boolean reversed);
 
-        public Builder reset()
-        {
-            Arrays.fill(values, null);
-            size = 0;
-            kind = null;
-            return this;
-        }
-    }
+    public RangeTombstoneMarker copy(AbstractAllocator allocator);
 
     /**
      * Utility class to help merging range tombstone markers coming from multiple inputs (UnfilteredRowIterators).
@@ -123,8 +64,6 @@ public interface RangeTombstoneMarker extends Unfiltered
      */
     public static class Merger
     {
-        private final CFMetaData metadata;
-        private final UnfilteredRowIterators.MergeListener listener;
         private final DeletionTime partitionDeletion;
         private final boolean reversed;
 
@@ -137,10 +76,8 @@ public interface RangeTombstoneMarker extends Unfiltered
         // marker on any iterator.
         private int biggestOpenMarker = -1;
 
-        public Merger(CFMetaData metadata, int size, DeletionTime partitionDeletion, boolean reversed, UnfilteredRowIterators.MergeListener listener)
+        public Merger(int size, DeletionTime partitionDeletion, boolean reversed)
         {
-            this.metadata = metadata;
-            this.listener = listener;
             this.partitionDeletion = partitionDeletion;
             this.reversed = reversed;
 
@@ -202,12 +139,14 @@ public interface RangeTombstoneMarker extends Unfiltered
                        : RangeTombstoneBoundaryMarker.inclusiveCloseExclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged);
             }
 
-            if (listener != null)
-                listener.onMergedRangeTombstoneMarkers(merged, markers);
-
             return merged;
         }
 
+        public RangeTombstoneMarker[] mergedMarkers()
+        {
+            return markers;
+        }
+
         private DeletionTime currentOpenDeletionTimeInMerged()
         {
             if (biggestOpenMarker < 0)
@@ -215,7 +154,7 @@ public interface RangeTombstoneMarker extends Unfiltered
 
             DeletionTime biggestDeletionTime = openMarkers[biggestOpenMarker];
             // it's only open in the merged iterator if it's not shadowed by the partition level deletion
-            return partitionDeletion.supersedes(biggestDeletionTime) ? DeletionTime.LIVE : biggestDeletionTime.takeAlias();
+            return partitionDeletion.supersedes(biggestDeletionTime) ? DeletionTime.LIVE : biggestDeletionTime;
         }
 
         private void updateOpenMarkers()
@@ -229,7 +168,7 @@ public interface RangeTombstoneMarker extends Unfiltered
                 // Note that we can have boundaries that are both open and close, but in that case all we care about
                 // is what it the open deletion after the marker, so we favor the opening part in this case.
                 if (marker.isOpen(reversed))
-                    openMarkers[i] = marker.openDeletionTime(reversed).takeAlias();
+                    openMarkers[i] = marker.openDeletionTime(reversed);
                 else
                     openMarkers[i] = null;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/ReusableRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ReusableRow.java b/src/java/org/apache/cassandra/db/rows/ReusableRow.java
deleted file mode 100644
index 0135afc..0000000
--- a/src/java/org/apache/cassandra/db/rows/ReusableRow.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.db.*;
-
-public class ReusableRow extends AbstractReusableRow
-{
-    private final ReusableClustering clustering;
-
-    private final ReusableLivenessInfo liveness = new ReusableLivenessInfo();
-
-    private DeletionTime deletion = DeletionTime.LIVE;
-
-    private final RowDataBlock data;
-    private final Writer writer;
-
-    public ReusableRow(int clusteringSize, Columns columns, boolean inOrderCells, boolean isCounter)
-    {
-        this.clustering = new ReusableClustering(clusteringSize);
-        this.data = new RowDataBlock(columns, 1, false, isCounter);
-        this.writer = new Writer(data, inOrderCells);
-    }
-
-    protected RowDataBlock data()
-    {
-        return data;
-    }
-
-    protected int row()
-    {
-        return 0;
-    }
-
-    public Clustering clustering()
-    {
-        return clustering;
-    }
-
-    public LivenessInfo primaryKeyLivenessInfo()
-    {
-        return liveness;
-    }
-
-    public DeletionTime deletion()
-    {
-        return deletion;
-    }
-
-    public Row.Writer writer()
-    {
-        return writer.reset();
-    }
-
-    private class Writer extends RowDataBlock.Writer
-    {
-        public Writer(RowDataBlock data, boolean inOrderCells)
-        {
-            super(data, inOrderCells);
-        }
-
-        public void writeClusteringValue(ByteBuffer buffer)
-        {
-            clustering.writer().writeClusteringValue(buffer);
-        }
-
-        public void writePartitionKeyLivenessInfo(LivenessInfo info)
-        {
-            ReusableRow.this.liveness.setTo(info);
-        }
-
-        public void writeRowDeletion(DeletionTime deletion)
-        {
-            ReusableRow.this.deletion = deletion;
-        }
-
-        @Override
-        public Writer reset()
-        {
-            super.reset();
-            clustering.reset();
-            liveness.reset();
-            deletion = DeletionTime.LIVE;
-            return this;
-        }
-    }
-}