You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/07/28 16:00:59 UTC

[1/2] cassandra git commit: Use BTree to back default Row and ComplexColumnData objects

Repository: cassandra
Updated Branches:
  refs/heads/trunk de420e50e -> 639d4b240


http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/ColumnData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java b/src/java/org/apache/cassandra/db/rows/ColumnData.java
index 95bad48..84763e5 100644
--- a/src/java/org/apache/cassandra/db/rows/ColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java
@@ -31,42 +31,45 @@ import org.apache.cassandra.serializers.MarshalException;
  * In practice, there is only 2 implementations of this: either {@link Cell} for simple columns
  * or {@code ComplexColumnData} for complex columns.
  */
-public interface ColumnData
+public abstract class ColumnData
 {
     public static final Comparator<ColumnData> comparator = (cd1, cd2) -> cd1.column().compareTo(cd2.column());
 
-    // A comparator for the cells of the *similar* ColumnData, i.e. one that assumes the cells are all for the same column.
-    public static final Comparator<Cell> cellComparator = (c1, c2) -> c1.column().cellPathComparator().compare(c1.path(), c2.path());
+    protected final ColumnDefinition column;
+    protected ColumnData(ColumnDefinition column)
+    {
+        this.column = column;
+    }
 
     /**
      * The column this is data for.
      *
      * @return the column this is a data for.
      */
-    public ColumnDefinition column();
+    public final ColumnDefinition column() { return column; }
 
     /**
      * The size of the data hold by this {@code ColumnData}.
      *
      * @return the size used by the data of this {@code ColumnData}.
      */
-    public int dataSize();
+    public abstract int dataSize();
 
-    public long unsharedHeapSizeExcludingData();
+    public abstract long unsharedHeapSizeExcludingData();
 
     /**
      * Validate the column data.
      *
      * @throws MarshalException if the data is not valid.
      */
-    public void validate();
+    public abstract void validate();
 
     /**
      * Adds the data to the provided digest.
      *
      * @param digest the {@code MessageDigest} to add the data to.
      */
-    public void digest(MessageDigest digest);
+    public abstract void digest(MessageDigest digest);
 
     /**
      * Returns a copy of the data where all timestamps for live data have replaced by {@code newTimestamp} and
@@ -74,9 +77,9 @@ public interface ColumnData
      *
      * This exists for the Paxos path, see {@link PartitionUpdate#updateAllTimestamp} for additional details.
      */
-    public ColumnData updateAllTimestamp(long newTimestamp);
+    public abstract ColumnData updateAllTimestamp(long newTimestamp);
 
-    public ColumnData markCounterLocalToBeCleared();
+    public abstract ColumnData markCounterLocalToBeCleared();
 
-    public ColumnData purge(DeletionPurger purger, int nowInSec);
+    public abstract ColumnData purge(DeletionPurger purger, int nowInSec);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
index d87402a..76ab7e7 100644
--- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
@@ -20,8 +20,10 @@ package org.apache.cassandra.db.rows;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.*;
+import java.util.function.BiFunction;
 
-import com.google.common.collect.Iterators;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
@@ -31,59 +33,52 @@ import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.marshal.ByteType;
 import org.apache.cassandra.db.marshal.SetType;
 import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.UpdateFunction;
 
 /**
  * The data for a complex column, that is it's cells and potential complex
  * deletion time.
  */
-public class ComplexColumnData implements ColumnData, Iterable<Cell>
+public class ComplexColumnData extends ColumnData implements Iterable<Cell>
 {
     static final Cell[] NO_CELLS = new Cell[0];
 
     private static final long EMPTY_SIZE = ObjectSizes.measure(new ComplexColumnData(ColumnDefinition.regularDef("", "", "", SetType.getInstance(ByteType.instance, true)), NO_CELLS, new DeletionTime(0, 0)));
 
-    private final ColumnDefinition column;
-
     // The cells for 'column' sorted by cell path.
-    private final Cell[] cells;
+    private final Object[] cells;
 
     private final DeletionTime complexDeletion;
 
     // Only ArrayBackedRow should call this.
-    ComplexColumnData(ColumnDefinition column, Cell[] cells, DeletionTime complexDeletion)
+    ComplexColumnData(ColumnDefinition column, Object[] cells, DeletionTime complexDeletion)
     {
+        super(column);
         assert column.isComplex();
         assert cells.length > 0 || !complexDeletion.isLive();
-        this.column = column;
         this.cells = cells;
         this.complexDeletion = complexDeletion;
     }
 
     public boolean hasCells()
     {
-        return cellsCount() > 0;
+        return !BTree.isEmpty(cells);
     }
 
     public int cellsCount()
     {
-        return cells.length;
-    }
-
-    public ColumnDefinition column()
-    {
-        return column;
+        return BTree.size(cells);
     }
 
     public Cell getCell(CellPath path)
     {
-        int idx = binarySearch(path);
-        return idx < 0 ? null : cells[idx];
+        return (Cell) BTree.<Object>find(cells, column.asymmetricCellPathComparator(), path);
     }
 
-    public Cell getCellByIndex(int i)
+    public Cell getCellByIndex(int idx)
     {
-        assert 0 <= i && i < cells.length;
-        return cells[i];
+        return BTree.findByIndex(cells, idx);
     }
 
     /**
@@ -104,13 +99,13 @@ public class ComplexColumnData implements ColumnData, Iterable<Cell>
 
     public Iterator<Cell> iterator()
     {
-        return Iterators.forArray(cells);
+        return BTree.iterator(cells);
     }
 
     public int dataSize()
     {
         int size = complexDeletion.dataSize();
-        for (Cell cell : cells)
+        for (Cell cell : this)
             size += cell.dataSize();
         return size;
     }
@@ -118,167 +113,78 @@ public class ComplexColumnData implements ColumnData, Iterable<Cell>
     public long unsharedHeapSizeExcludingData()
     {
         long heapSize = EMPTY_SIZE + ObjectSizes.sizeOfArray(cells);
-        for (Cell cell : cells)
+        // TODO: this can be turned into a simple multiplication, at least while we have only one Cell implementation
+        for (Cell cell : this)
             heapSize += cell.unsharedHeapSizeExcludingData();
         return heapSize;
     }
 
     public void validate()
     {
-        for (Cell cell : cells)
+        for (Cell cell : this)
             cell.validate();
     }
 
-    public ComplexColumnData filter(ColumnFilter filter, DeletionTime activeDeletion, CFMetaData.DroppedColumn dropped)
-    {
-        ColumnFilter.Tester cellTester = filter.newTester(column);
-        if (cellTester == null && activeDeletion.isLive() && dropped == null)
-            return this;
-
-        DeletionTime newComplexDeletion = activeDeletion.supersedes(complexDeletion) ? DeletionTime.LIVE : complexDeletion;
-
-        int newSize = 0;
-        for (Cell cell : cells)
-        {
-            // The cell must be:
-            //   - Included by the query
-            //   - not shadowed by the active deletion
-            //   - not being for a dropped column
-            if ((cellTester == null || cellTester.includes(cell.path()))
-                 && !activeDeletion.deletes(cell)
-                 && (dropped == null || cell.timestamp() > dropped.droppedTime))
-                ++newSize;
-        }
-
-
-        if (newSize == 0)
-            return newComplexDeletion.isLive() ? null : new ComplexColumnData(column, NO_CELLS, newComplexDeletion);
-
-        if (newSize == cells.length && newComplexDeletion == complexDeletion)
-            return this;
-
-        Cell[] newCells = new Cell[newSize];
-        int j = 0;
-        cellTester = filter.newTester(column); // we need to reste the tester
-        for (Cell cell : cells)
-        {
-            if ((cellTester == null || cellTester.includes(cell.path()))
-                && !activeDeletion.deletes(cell)
-                && (dropped == null || cell.timestamp() > dropped.droppedTime))
-                newCells[j++] = cell;
-        }
-        assert j == newSize;
-
-        return new ComplexColumnData(column, newCells, newComplexDeletion);
-    }
-
     public void digest(MessageDigest digest)
     {
         if (!complexDeletion.isLive())
             complexDeletion.digest(digest);
 
-        for (Cell cell : cells)
+        for (Cell cell : this)
             cell.digest(digest);
     }
 
     public ComplexColumnData markCounterLocalToBeCleared()
     {
-        Cell[] newCells = null;
-        for (int i = 0; i < cells.length; i++)
-        {
-            Cell cell = cells[i];
-            Cell marked = cell.markCounterLocalToBeCleared();
-            if (marked != cell)
-            {
-                if (newCells == null)
-                    newCells = Arrays.copyOf(cells, cells.length);
-                newCells[i] = marked;
-            }
-        }
+        return transformAndFilter(complexDeletion, Cell::markCounterLocalToBeCleared);
+    }
+
+    public ComplexColumnData filter(ColumnFilter filter, DeletionTime activeDeletion, CFMetaData.DroppedColumn dropped)
+    {
+        ColumnFilter.Tester cellTester = filter.newTester(column);
+        if (cellTester == null && activeDeletion.isLive() && dropped == null)
+            return this;
 
-        return newCells == null
-             ? this
-             : new ComplexColumnData(column, newCells, complexDeletion);
+        DeletionTime newDeletion = activeDeletion.supersedes(complexDeletion) ? DeletionTime.LIVE : complexDeletion;
+        return transformAndFilter(newDeletion,
+                                  (cell) ->
+                                           (cellTester == null || cellTester.includes(cell.path()))
+                                        && !activeDeletion.deletes(cell)
+                                        && (dropped == null || cell.timestamp() > dropped.droppedTime)
+                                           ? cell : null);
     }
 
     public ComplexColumnData purge(DeletionPurger purger, int nowInSec)
     {
         DeletionTime newDeletion = complexDeletion.isLive() || purger.shouldPurge(complexDeletion) ? DeletionTime.LIVE : complexDeletion;
+        return transformAndFilter(newDeletion, (cell) -> cell.purge(purger, nowInSec));
+    }
 
-        int newSize = 0;
-        for (Cell cell : cells)
-        {
-            Cell purged = cell.purge(purger, nowInSec);
-            if (purged != null)
-                ++newSize;
-        }
-
-        if (newSize == 0)
-            return newDeletion.isLive() ? null : new ComplexColumnData(column, NO_CELLS, newDeletion);
+    private ComplexColumnData transformAndFilter(DeletionTime newDeletion, Function<? super Cell, ? extends Cell> function)
+    {
+        Object[] transformed = BTree.transformAndFilter(cells, function);
 
-        if (newDeletion == complexDeletion && newSize == cells.length)
+        if (cells == transformed && newDeletion == complexDeletion)
             return this;
 
-        Cell[] newCells = new Cell[newSize];
-        int j = 0;
-        for (Cell cell : cells)
-        {
-            Cell purged = cell.purge(purger, nowInSec);
-            if (purged != null)
-                newCells[j++] = purged;
-        }
-        assert j == newSize;
+        if (newDeletion == DeletionTime.LIVE && BTree.isEmpty(transformed))
+            return null;
 
-        return new ComplexColumnData(column, newCells, newDeletion);
+        return new ComplexColumnData(column, transformed, newDeletion);
     }
 
     public ComplexColumnData updateAllTimestamp(long newTimestamp)
     {
         DeletionTime newDeletion = complexDeletion.isLive() ? complexDeletion : new DeletionTime(newTimestamp - 1, complexDeletion.localDeletionTime());
-        Cell[] newCells = new Cell[cells.length];
-        for (int i = 0; i < cells.length; i++)
-            newCells[i] = (Cell)cells[i].updateAllTimestamp(newTimestamp);
-
-        return new ComplexColumnData(column, newCells, newDeletion);
+        return transformAndFilter(newDeletion, (cell) -> (Cell) cell.updateAllTimestamp(newTimestamp));
     }
 
     // This is the partner in crime of ArrayBackedRow.setValue. The exact warning apply. The short
     // version is: "don't use that method".
     void setValue(CellPath path, ByteBuffer value)
     {
-        int idx = binarySearch(path);
-        assert idx >= 0;
-        cells[idx] = cells[idx].withUpdatedValue(value);
-    }
-
-    private int binarySearch(CellPath path)
-    {
-        return binarySearch(path, 0, cells.length);
-    }
-
-    /**
-     * Simple binary search for a given cell (in the cells array).
-     *
-     * The return value has the exact same meaning that the one of Collections.binarySearch() but
-     * we don't use the later because we're searching for a 'CellPath' in an array of 'Cell'.
-     */
-    private int binarySearch(CellPath path, int fromIndex, int toIndex)
-    {
-        int low = fromIndex;
-        int mid = toIndex;
-        int high = mid - 1;
-        int result = -1;
-        while (low <= high)
-        {
-            mid = (low + high) >> 1;
-            if ((result = column.cellPathComparator().compare(path, cells[mid].path())) > 0)
-                low = mid + 1;
-            else if (result == 0)
-                return mid;
-            else
-                high = mid - 1;
-        }
-        return -mid - (result < 0 ? 1 : 2);
+        Cell current = (Cell) BTree.<Object>find(cells, column.asymmetricCellPathComparator(), path);
+        BTree.replaceInSitu(cells, column.cellComparator(), current, current.withUpdatedValue(value));
     }
 
     @Override
@@ -293,7 +199,7 @@ public class ComplexColumnData implements ColumnData, Iterable<Cell>
         ComplexColumnData that = (ComplexColumnData)other;
         return this.column().equals(that.column())
             && this.complexDeletion().equals(that.complexDeletion)
-            && Arrays.equals(this.cells, that.cells);
+            && BTree.equals(this.cells, that.cells);
     }
 
     @Override
@@ -309,15 +215,20 @@ public class ComplexColumnData implements ColumnData, Iterable<Cell>
 
     public static class Builder
     {
-        private ColumnDefinition column;
+        private static BiFunction<Cell, Cell, Cell> noResolve = (a, b) -> {
+            throw new IllegalStateException();
+        };
+
         private DeletionTime complexDeletion;
-        public final List<Cell> cells = new ArrayList<>();
+        private ColumnDefinition column;
+        private BTree.Builder<Cell> builder;
 
         public void newColumn(ColumnDefinition column)
         {
             this.column = column;
             this.complexDeletion = DeletionTime.LIVE; // default if writeComplexDeletion is not called
-            this.cells.clear();
+            if (builder == null) builder = BTree.builder(column.cellComparator());
+            else builder.reuse(column.cellComparator());
         }
 
         public void addComplexDeletion(DeletionTime complexDeletion)
@@ -328,16 +239,15 @@ public class ComplexColumnData implements ColumnData, Iterable<Cell>
         public void addCell(Cell cell)
         {
             assert cell.column().equals(column);
-            assert cells.isEmpty() || cell.column().cellPathComparator().compare(cells.get(cells.size() - 1).path(), cell.path()) < 0;
-            cells.add(cell);
+            builder.add(cell);
         }
 
         public ComplexColumnData build()
         {
-            if (complexDeletion.isLive() && cells.isEmpty())
+            if (complexDeletion.isLive() && builder.isEmpty())
                 return null;
 
-            return new ComplexColumnData(column, cells.toArray(new Cell[cells.size()]), complexDeletion);
+            return new ComplexColumnData(column, builder.build(), complexDeletion);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
index ad21c69..929a5fb 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -26,6 +26,8 @@ import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.utils.MergeIterator;
 import org.apache.cassandra.utils.SearchIterator;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.UpdateFunction;
 
 /**
  * Storage engine representation of a row.
@@ -381,7 +383,7 @@ public interface Row extends Unfiltered, Iterable<ColumnData>
             // Because some data might have been shadowed by the 'activeDeletion', we could have an empty row
             return rowInfo.isEmpty() && rowDeletion.isLive() && dataBuffer.isEmpty()
                  ? null
-                 : ArrayBackedRow.create(clustering, columns, rowInfo, rowDeletion, dataBuffer.size(), dataBuffer.toArray(new ColumnData[dataBuffer.size()]));
+                 : BTreeBackedRow.create(clustering, columns, rowInfo, rowDeletion, BTree.build(dataBuffer, UpdateFunction.<ColumnData>noOp()));
         }
 
         public Clustering mergedClustering()
@@ -463,7 +465,7 @@ public interface Row extends Unfiltered, Iterable<ColumnData>
                         cellReducer.setActiveDeletion(activeDeletion);
                     }
 
-                    Iterator<Cell> cells = MergeIterator.get(complexCells, ColumnData.cellComparator, cellReducer);
+                    Iterator<Cell> cells = MergeIterator.get(complexCells, ColumnData.comparator, cellReducer);
                     while (cells.hasNext())
                     {
                         Cell merged = cells.next();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/Rows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java
index 122f7d3..bacd591 100644
--- a/src/java/org/apache/cassandra/db/rows/Rows.java
+++ b/src/java/org/apache/cassandra/db/rows/Rows.java
@@ -49,7 +49,7 @@ public abstract class Rows
 
     private Rows() {}
 
-    public static final Row EMPTY_STATIC_ROW = ArrayBackedRow.emptyRow(Clustering.STATIC_CLUSTERING);
+    public static final Row EMPTY_STATIC_ROW = BTreeBackedRow.emptyRow(Clustering.STATIC_CLUSTERING);
 
     public static Row.Builder copy(Row row, Row.Builder builder)
     {
@@ -217,7 +217,7 @@ public abstract class Rows
     public static Row merge(Row row1, Row row2, int nowInSec)
     {
         Columns mergedColumns = row1.columns().mergeTo(row2.columns());
-        Row.Builder builder = ArrayBackedRow.sortedBuilder(mergedColumns);
+        Row.Builder builder = BTreeBackedRow.sortedBuilder(mergedColumns);
         merge(row1, row2, mergedColumns, builder, nowInSec, SecondaryIndexManager.nullUpdater);
         return builder.build();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index ec46751..3a12584 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -204,7 +204,7 @@ public class UnfilteredRowIteratorSerializer
         final SerializationHeader sHeader = header.sHeader;
         return new AbstractUnfilteredRowIterator(metadata, header.key, header.partitionDeletion, sHeader.columns(), header.staticRow, header.isReversed, sHeader.stats())
         {
-            private final Row.Builder builder = ArrayBackedRow.sortedBuilder(sHeader.columns().regulars);
+            private final Row.Builder builder = BTreeBackedRow.sortedBuilder(sHeader.columns().regulars);
 
             protected Unfiltered computeNext()
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 5f110bb..650a18d 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -541,7 +541,7 @@ public abstract class UnfilteredRowIterators
                 {
                     Row merged = rowMerger.merge(markerMerger.activeDeletion());
                     if (listener != null)
-                        listener.onMergedRows(merged == null ? ArrayBackedRow.emptyRow(rowMerger.mergedClustering()) : merged, columns().regulars, rowMerger.mergedRows());
+                        listener.onMergedRows(merged == null ? BTreeBackedRow.emptyRow(rowMerger.mergedClustering()) : merged, columns().regulars, rowMerger.mergedRows());
                     return merged;
                 }
                 else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 358c841..f6eb62a 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -341,7 +341,7 @@ public class UnfilteredSerializer
     {
         int flags = in.readUnsignedByte();
         assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW && isStatic(flags) : flags;
-        Row.Builder builder = ArrayBackedRow.sortedBuilder(helper.fetchedStaticColumns(header));
+        Row.Builder builder = BTreeBackedRow.sortedBuilder(helper.fetchedStaticColumns(header));
         builder.newRow(Clustering.STATIC_CLUSTERING);
         return deserializeRowBody(in, header, helper, flags, builder);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
index 56b621a..5a0a0ad 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
@@ -71,7 +71,7 @@ public abstract class SSTableSimpleIterator extends AbstractIterator<Unfiltered>
         {
             super(metadata, in, helper);
             this.header = header;
-            this.builder = ArrayBackedRow.sortedBuilder(helper.fetchedRegularColumns(header));
+            this.builder = BTreeBackedRow.sortedBuilder(helper.fetchedRegularColumns(header));
         }
 
         public Row readStaticRow() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 515a0d8..a1b5c96 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -214,7 +214,7 @@ public class DataResolver extends ResponseResolver
             {
                 if (currentRows[i] == null)
                 {
-                    currentRows[i] = ArrayBackedRow.sortedBuilder(clustering == Clustering.STATIC_CLUSTERING ? columns.statics : columns.regulars);
+                    currentRows[i] = BTreeBackedRow.sortedBuilder(clustering == Clustering.STATIC_CLUSTERING ? columns.statics : columns.regulars);
                     currentRows[i].newRow(clustering);
                 }
                 return currentRows[i];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 37b3476..b38f58e 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -27,7 +27,6 @@ import java.util.concurrent.TimeoutException;
 import java.util.zip.DataFormatException;
 import java.util.zip.Inflater;
 
-import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.collect.*;
 import com.google.common.primitives.Longs;
@@ -823,7 +822,7 @@ public class CassandraServer implements Cassandra.Iface
 
             LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(metadata, column_parent.super_column, column.name);
             Cell cell = cellFromColumn(metadata, name, column);
-            PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, dk, ArrayBackedRow.singleCellRow(name.clustering, cell));
+            PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, dk, BTreeBackedRow.singleCellRow(name.clustering, cell));
 
             mutation = new org.apache.cassandra.db.Mutation(update);
         }
@@ -1319,7 +1318,7 @@ public class CassandraServer implements Cassandra.Iface
         }
         else if (column_path.super_column != null && column_path.column == null)
         {
-            Row row = ArrayBackedRow.emptyDeletedRow(new Clustering(column_path.super_column), new DeletionTime(timestamp, nowInSec));
+            Row row = BTreeBackedRow.emptyDeletedRow(new Clustering(column_path.super_column), new DeletionTime(timestamp, nowInSec));
             update = PartitionUpdate.singleRowUpdate(metadata, dk, row);
         }
         else
@@ -1329,7 +1328,7 @@ public class CassandraServer implements Cassandra.Iface
                 LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(metadata, column_path.super_column, column_path.column);
                 CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement);
                 Cell cell = BufferCell.tombstone(name.column, timestamp, nowInSec, path);
-                update = PartitionUpdate.singleRowUpdate(metadata, dk, ArrayBackedRow.singleCellRow(name.clustering, cell));
+                update = PartitionUpdate.singleRowUpdate(metadata, dk, BTreeBackedRow.singleCellRow(name.clustering, cell));
             }
             catch (UnknownColumnException e)
             {
@@ -2114,7 +2113,7 @@ public class CassandraServer implements Cassandra.Iface
                 CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement);
                 Cell cell = BufferCell.live(metadata, name.column, FBUtilities.timestampMicros(), value, path);
 
-                PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, dk, ArrayBackedRow.singleCellRow(name.clustering, cell));
+                PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, dk, BTreeBackedRow.singleCellRow(name.clustering, cell));
 
                 org.apache.cassandra.db.Mutation mutation = new org.apache.cassandra.db.Mutation(update);
                 doInsert(consistency_level, Arrays.asList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level))));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java b/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
index 9c5a99f..59d1b68 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
@@ -112,7 +112,7 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator
             super(results);
             assert results.metadata().isStaticCompactTable();
             this.nowInSec = nowInSec;
-            this.builder = ArrayBackedRow.sortedBuilder(results.columns().regulars);
+            this.builder = BTreeBackedRow.sortedBuilder(results.columns().regulars);
         }
 
         private void init()
@@ -220,7 +220,7 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator
             this.superColumnMapColumn = results.metadata().compactValueColumn();
             assert superColumnMapColumn != null && superColumnMapColumn.type instanceof MapType;
 
-            this.builder = ArrayBackedRow.sortedBuilder(Columns.of(superColumnMapColumn));
+            this.builder = BTreeBackedRow.sortedBuilder(Columns.of(superColumnMapColumn));
             this.columnComparator = ((MapType)superColumnMapColumn.type).nameComparator();
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/utils/btree/BTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java b/src/java/org/apache/cassandra/utils/btree/BTree.java
index 6e15638..62942b4 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTree.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTree.java
@@ -20,10 +20,15 @@ package org.apache.cassandra.utils.btree;
 
 import java.util.*;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Ordering;
 
 import org.apache.cassandra.utils.ObjectSizes;
 
+import static com.google.common.collect.Iterables.concat;
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.transform;
 import static java.lang.Math.max;
 import static java.lang.Math.min;
 
@@ -68,11 +73,15 @@ public class BTree
     // An empty BTree branch - used only for internal purposes in Modifier
     static final Object[] EMPTY_BRANCH = new Object[] { null, new int[0] };
 
-    /**
-     * Returns an empty BTree
-     *
-     * @return
-     */
+    // direction of iteration
+    public static enum Dir
+    {
+        ASC, DESC;
+        public Dir invert() { return this == ASC ? DESC : ASC; }
+        public static Dir asc(boolean asc) { return asc ? ASC : DESC; }
+        public static Dir desc(boolean desc) { return desc ? DESC : ASC; }
+    }
+
     public static Object[] empty()
     {
         return EMPTY_LEAF;
@@ -85,18 +94,35 @@ public class BTree
 
     public static <C, K extends C, V extends C> Object[] build(Collection<K> source, UpdateFunction<K, V> updateF)
     {
-        return build(source, source.size(), updateF);
+        return buildInternal(source, source.size(), updateF);
+    }
+
+    public static <C, K extends C, V extends C> Object[] build(Iterable<K> source, UpdateFunction<K, V> updateF)
+    {
+        return buildInternal(source, -1, updateF);
     }
 
     /**
      * Creates a BTree containing all of the objects in the provided collection
      *
      * @param source  the items to build the tree with. MUST BE IN STRICTLY ASCENDING ORDER.
+     * @param size    the size of the source iterable
      * @return        a btree representing the contents of the provided iterable
      */
     public static <C, K extends C, V extends C> Object[] build(Iterable<K> source, int size, UpdateFunction<K, V> updateF)
     {
-        if (size < FAN_FACTOR)
+        if (size < 0)
+            throw new IllegalArgumentException(Integer.toString(size));
+        return buildInternal(source, size, updateF);
+    }
+
+    /**
+     * As build(), except:
+     * @param size    < 0 if size is unknown
+     */
+    private static <C, K extends C, V extends C> Object[] buildInternal(Iterable<K> source, int size, UpdateFunction<K, V> updateF)
+    {
+        if ((size >= 0) & (size < FAN_FACTOR))
         {
             if (size == 0)
                 return EMPTY_LEAF;
@@ -168,17 +194,47 @@ public class BTree
         return update(tree1, comparator, new BTreeSet<K>(tree2, comparator), UpdateFunction.<K>noOp());
     }
 
+    public static <V> Iterator<V> iterator(Object[] btree)
+    {
+        return iterator(btree, Dir.ASC);
+    }
+
+    public static <V> Iterator<V> iterator(Object[] btree, Dir dir)
+    {
+        return new BTreeSearchIterator<V, V>(btree, null, dir);
+    }
+
+    public static <V> Iterator<V> iterator(Object[] btree, int lb, int ub, Dir dir)
+    {
+        return new BTreeSearchIterator<V, V>(btree, null, dir, lb, ub);
+    }
+
+    public static <V> Iterable<V> iterable(Object[] btree)
+    {
+        return iterable(btree, Dir.ASC);
+    }
+
+    public static <V> Iterable<V> iterable(Object[] btree, Dir dir)
+    {
+        return () -> iterator(btree, dir);
+    }
+
+    public static <V> Iterable<V> iterable(Object[] btree, int lb, int ub, Dir dir)
+    {
+        return () -> iterator(btree, lb, ub, dir);
+    }
+
     /**
      * Returns an Iterator over the entire tree
      *
-     * @param btree    the tree to iterate over
-     * @param forwards if false, the iterator will start at the end and move backwards
+     * @param btree  the tree to iterate over
+     * @param dir    direction of iteration
      * @param <V>
      * @return
      */
-    public static <V> BTreeSearchIterator<V, V> slice(Object[] btree, Comparator<? super V> comparator, boolean forwards)
+    public static <K, V> BTreeSearchIterator<K, V> slice(Object[] btree, Comparator<? super K> comparator, Dir dir)
     {
-        return new BTreeSearchIterator<>(btree, comparator, forwards);
+        return new BTreeSearchIterator<>(btree, comparator, dir);
     }
 
     /**
@@ -186,12 +242,12 @@ public class BTree
      * @param comparator the comparator that defines the ordering over the items in the tree
      * @param start      the beginning of the range to return, inclusive (in ascending order)
      * @param end        the end of the range to return, exclusive (in ascending order)
-     * @param forwards   if false, the iterator will start at the last item and move backwards
+     * @param dir   if false, the iterator will start at the last item and move backwards
      * @return           an Iterator over the defined sub-range of the tree
      */
-    public static <K, V extends K> BTreeSearchIterator<K, V> slice(Object[] btree, Comparator<? super K> comparator, K start, K end, boolean forwards)
+    public static <K, V extends K> BTreeSearchIterator<K, V> slice(Object[] btree, Comparator<? super K> comparator, K start, K end, Dir dir)
     {
-        return slice(btree, comparator, start, true, end, false, forwards);
+        return slice(btree, comparator, start, true, end, false, dir);
     }
 
     /**
@@ -201,10 +257,10 @@ public class BTree
      * @param startInclusive inclusivity of lower bound
      * @param end            high bound of the range
      * @param endInclusive   inclusivity of higher bound
-     * @param forwards       if false, the iterator will start at end and move backwards
-     * @return           an Iterator over the defined sub-range of the tree
+     * @param dir            direction of iteration
+     * @return               an Iterator over the defined sub-range of the tree
      */
-    public static <K, V extends K> BTreeSearchIterator<K, V> slice(Object[] btree, Comparator<? super K> comparator, K start, boolean startInclusive, K end, boolean endInclusive, boolean forwards)
+    public static <K, V extends K> BTreeSearchIterator<K, V> slice(Object[] btree, Comparator<? super K> comparator, K start, boolean startInclusive, K end, boolean endInclusive, Dir dir)
     {
         int inclusiveLowerBound = max(0,
                                       start == null ? Integer.MIN_VALUE
@@ -214,14 +270,61 @@ public class BTree
                                       end == null ? Integer.MAX_VALUE
                                                   : endInclusive ? floorIndex(btree, comparator, end)
                                                                  : lowerIndex(btree, comparator, end));
-        return new BTreeSearchIterator<>(btree, comparator, forwards, inclusiveLowerBound, inclusiveUpperBound);
+        return new BTreeSearchIterator<>(btree, comparator, dir, inclusiveLowerBound, inclusiveUpperBound);
+    }
+
+    /**
+     * @return the item in the tree that sorts as equal to the search argument, or null if no such item
+     */
+    public static <V> V find(Object[] node, Comparator<? super V> comparator, V find)
+    {
+        while (true)
+        {
+            int keyEnd = getKeyEnd(node);
+            int i = Arrays.binarySearch((V[]) node, 0, keyEnd, find, comparator);
+
+            if (i >= 0)
+                return (V) node[i];
+
+            if (isLeaf(node))
+                return null;
+
+            i = -1 - i;
+            node = (Object[]) node[keyEnd + i];
+        }
+    }
+
+    /**
+     * Modifies the provided btree directly. THIS SHOULD NOT BE USED WITHOUT EXTREME CARE as BTrees are meant to be immutable.
+     * Finds and replaces the provided item in the tree. Both should sort as equal to each other (although this is not enforced)
+     */
+    public static <V> void replaceInSitu(Object[] node, Comparator<? super V> comparator, V find, V replace)
+    {
+        while (true)
+        {
+            int keyEnd = getKeyEnd(node);
+            int i = Arrays.binarySearch((V[]) node, 0, keyEnd, find, comparator);
+
+            if (i >= 0)
+            {
+                assert find == node[i];
+                node[i] = replace;
+                return;
+            }
+
+            if (isLeaf(node))
+                throw new NoSuchElementException();
+
+            i = -1 - i;
+            node = (Object[]) node[keyEnd + i];
+        }
     }
 
     /**
      * Honours result semantics of {@link Arrays#binarySearch}, as though it were performed on the tree flattened into an array
      * @return index of item in tree, or <tt>(-(<i>insertion point</i>) - 1)</tt> if not present
      */
-    public static <V> int findIndex(Object[] node, Comparator<V> comparator, V find)
+    public static <V> int findIndex(Object[] node, Comparator<? super V> comparator, V find)
     {
         int lb = 0;
         while (true)
@@ -291,7 +394,6 @@ public class BTree
      * (having less height, and operating over only primitive arrays), and the clarity is compelling
      */
 
-
     public static <V> int lowerIndex(Object[] btree, Comparator<? super V> comparator, V find)
     {
         int i = findIndex(btree, comparator, find);
@@ -425,6 +527,16 @@ public class BTree
         return ((int[]) tree[length - 1])[(length / 2) - 1];
     }
 
+    public static long sizeOfStructureOnHeap(Object[] tree)
+    {
+        long size = ObjectSizes.sizeOfArray(tree);
+        if (isLeaf(tree))
+            return size;
+        for (int i = getChildStart(tree) ; i < getChildEnd(tree) ; i++)
+            size += sizeOfStructureOnHeap((Object[]) tree[i]);
+        return size;
+    }
+
     // returns true if the provided node is a leaf, false if it is a branch
     static boolean isLeaf(Object[] node)
     {
@@ -485,6 +597,81 @@ public class BTree
         return newTargetOffset - targetOffset;
     }
 
+    // simple class for avoiding duplicate transformation work
+    private static class FiltrationTracker<V> implements Function<V, V>
+    {
+        final Function<? super V, ? extends V> wrapped;
+        int index;
+        boolean failed;
+
+        private FiltrationTracker(Function<? super V, ? extends V> wrapped)
+        {
+            this.wrapped = wrapped;
+        }
+
+        public V apply(V i)
+        {
+            V o = wrapped.apply(i);
+            if (o != null) index++;
+            else failed = true;
+            return o;
+        }
+    }
+
+    /**
+     * Takes a btree and transforms it using the provided function, filtering out any null results.
+     * The result of any transformation must sort identically wrt the other results as their originals
+     */
+    public static <V> Object[] transformAndFilter(Object[] btree, Function<? super V, ? extends V> function)
+    {
+        if (isEmpty(btree))
+            return btree;
+
+        // TODO: can be made more efficient
+        FiltrationTracker<V> wrapped = new FiltrationTracker<>(function);
+        Object[] result = transformAndFilter(btree, wrapped);
+        if (!wrapped.failed)
+            return result;
+
+        // take the already transformed bits from the head of the partial result
+        Iterable<V> head = iterable(result, 0, wrapped.index - 1, Dir.ASC);
+        // and concatenate with remainder of original tree, with transformation applied
+        Iterable<V> remainder = iterable(btree, wrapped.index + 1, size(btree) - 1, Dir.ASC);
+        remainder = filter(transform(remainder, function), (x) -> x != null);
+        Iterable<V> build = concat(head, remainder);
+
+        return buildInternal(build, -1, UpdateFunction.<V>noOp());
+    }
+
+    private static <V> Object[] transformAndFilter(Object[] btree, FiltrationTracker<V> function)
+    {
+        Object[] result = btree;
+        boolean isLeaf = isLeaf(btree);
+        int childOffset = isLeaf ? Integer.MAX_VALUE : getChildStart(btree);
+        int limit = isLeaf ? getLeafKeyEnd(btree) : btree.length - 1;
+        for (int i = 0 ; i < limit ; i++)
+        {
+            // we want to visit in iteration order, so we visit our key nodes inbetween our children
+            int idx = isLeaf ? i : (i / 2) + (i % 2 == 0 ? childOffset : 0);
+            Object current = btree[idx];
+            Object updated = idx < childOffset ? function.apply((V) current) : transformAndFilter((Object[]) current, function);
+            if (updated != current)
+            {
+                if (result == btree)
+                    result = btree.clone();
+                result[idx] = updated;
+            }
+            if (function.failed)
+                return result;
+        }
+        return result;
+    }
+
+    public static boolean equals(Object[] a, Object[] b)
+    {
+        return Iterators.elementsEqual(iterator(a), iterator(b));
+    }
+
     /**
      * tree index => index of key wrt all items in the tree laid out serially
      *
@@ -553,28 +740,60 @@ public class BTree
 
     public static class Builder<V>
     {
+        public static interface Resolver
+        {
+            // can return a different output type to input, so long as sort order is maintained
+            // if a resolver is present, this method will be called for every sequence of equal inputs
+            // even those with only one item
+            Object resolve(Object[] array, int lb, int ub);
+        }
 
-        final Comparator<? super V> comparator;
+        Comparator<? super V> comparator;
         Object[] values = new Object[10];
         int count;
-        boolean detected; // true if we have managed to cheaply ensure sorted + filtered as we have added
+        boolean detected; // true if we have managed to cheaply ensure sorted (+ filtered, if resolver == null) as we have added
+        boolean auto = true; // false if the user has promised to enforce the sort order and resolve any duplicates
 
         protected Builder(Comparator<? super V> comparator)
         {
             this.comparator = comparator;
         }
 
+        public void reuse()
+        {
+            reuse(comparator);
+        }
+
+        public void reuse(Comparator<? super V> comparator)
+        {
+            this.comparator = comparator;
+            count = 0;
+            detected = true;
+        }
+
+        public Builder<V> auto(boolean auto)
+        {
+            this.auto = auto;
+            return this;
+        }
+
         public Builder<V> add(V v)
         {
             if (count == values.length)
                 values = Arrays.copyOf(values, count * 2);
             values[count++] = v;
 
-            if (detected && count > 1)
+            if (auto && detected && count > 1)
             {
                 int c = comparator.compare((V) values[count - 2], (V) values[count - 1]);
-                if (c == 0) count--;
-                else if (c > 0) detected = false;
+                if (c == 0 && auto)
+                {
+                    count--;
+                }
+                else if (c > 0)
+                {
+                    detected = false;
+                }
             }
 
             return this;
@@ -582,9 +801,10 @@ public class BTree
 
         public Builder<V> addAll(Collection<V> add)
         {
-            if (add instanceof SortedSet && equalComparators(comparator, ((SortedSet) add).comparator()))
+            if (auto && add instanceof SortedSet && equalComparators(comparator, ((SortedSet) add).comparator()))
             {
                 // if we're a SortedSet, permit quick order-preserving addition of items
+                // if we collect all duplicates, don't bother as merge will necessarily be more expensive than sorting at end
                 return mergeAll(add, add.size());
             }
             detected = false;
@@ -606,15 +826,15 @@ public class BTree
         }
 
         // iter must be in sorted order!
-        public Builder<V> mergeAll(Iterable<V> add, int addCount)
+        private Builder<V> mergeAll(Iterable<V> add, int addCount)
         {
+            assert auto;
             // ensure the existing contents are in order
-            sortAndFilter();
+            autoEnforce();
 
             int curCount = count;
             // we make room for curCount * 2 + addCount, so that we can copy the current values to the end
             // if necessary for continuing the merge, and have the new values directly after the current value range
-            // i.e. []
             if (values.length < curCount * 2 + addCount)
                 values = Arrays.copyOf(values, max(curCount * 2 + addCount, curCount * 3));
 
@@ -633,18 +853,20 @@ public class BTree
             return mergeAll(addCount);
         }
 
-        // iter must be in sorted order!
         private Builder<V> mergeAll(int addCount)
         {
-            // start optimistically by assuming new values are superset of current, and just run until this fails to hold
             Object[] a = values;
             int addOffset = count;
 
             int i = 0, j = addOffset;
             int curEnd = addOffset, addEnd = addOffset + addCount;
+
+            // save time in cases where we already have a subset, by skipping dir
             while (i < curEnd && j < addEnd)
             {
-                int c = comparator.compare((V) a[i], (V) a[j]);
+                V ai = (V) a[i], aj = (V) a[j];
+                // in some cases, such as Columns, we may have identity supersets, so perform a cheap object-identity check
+                int c = ai == aj ? 0 : comparator.compare(ai, aj);
                 if (c > 0)
                     break;
                 else if (c == 0)
@@ -698,11 +920,18 @@ public class BTree
             return count == 0;
         }
 
-        private void sortAndFilter()
+        public Builder<V> sort()
+        {
+            Arrays.sort((V[]) values, 0, count, comparator);
+            return this;
+        }
+
+        // automatically enforce sorted+filtered
+        private void autoEnforce()
         {
             if (!detected && count > 1)
             {
-                Arrays.sort((V[]) values, 0, count, comparator);
+                sort();
                 int c = 1;
                 for (int i = 1 ; i < count ; i++)
                     if (comparator.compare((V) values[i], (V) values[i - 1]) != 0)
@@ -712,9 +941,30 @@ public class BTree
             detected = true;
         }
 
+        public Builder<V> resolve(Resolver resolver)
+        {
+            if (count > 0)
+            {
+                int c = 0;
+                int prev = 0;
+                for (int i = 1 ; i < count ; i++)
+                {
+                    if (comparator.compare((V) values[i], (V) values[prev]) != 0)
+                    {
+                        values[c++] = resolver.resolve((V[]) values, prev, i);
+                        prev = i;
+                    }
+                }
+                values[c++] = resolver.resolve((V[]) values, prev, count);
+                count = c;
+            }
+            return this;
+        }
+
         public Object[] build()
         {
-            sortAndFilter();
+            if (auto)
+                autoEnforce();
             return BTree.build(Arrays.asList(values).subList(0, count), UpdateFunction.noOp());
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java b/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java
index d6e6fae..6d023d2 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java
@@ -42,15 +42,15 @@ public class BTreeSearchIterator<K, V> extends TreeCursor<K> implements IndexedS
     private static final int LAST = 4; // may co-exist with ON_ITEM, in which case we are also at END
     private static final int END = 5; // equal to LAST | ON_ITEM
 
-    public BTreeSearchIterator(Object[] btree, Comparator<? super K> comparator, boolean forwards)
+    public BTreeSearchIterator(Object[] btree, Comparator<? super K> comparator, BTree.Dir dir)
     {
-        this(btree, comparator, forwards, 0, size(btree)-1);
+        this(btree, comparator, dir, 0, size(btree)-1);
     }
 
-    BTreeSearchIterator(Object[] btree, Comparator<? super K> comparator, boolean forwards, int lowerBound, int upperBound)
+    BTreeSearchIterator(Object[] btree, Comparator<? super K> comparator, BTree.Dir dir, int lowerBound, int upperBound)
     {
         super(comparator, btree);
-        this.forwards = forwards;
+        this.forwards = dir == BTree.Dir.ASC;
         this.lowerBound = lowerBound;
         this.upperBound = upperBound;
         rewind();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeSet.java b/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
index 7646693..03fa1ec 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
@@ -24,6 +24,8 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
 
+import org.apache.cassandra.utils.btree.BTree.Dir;
+
 import static org.apache.cassandra.utils.btree.BTree.findIndex;
 import static org.apache.cassandra.utils.btree.BTree.lower;
 import static org.apache.cassandra.utils.btree.BTree.toArray;
@@ -50,9 +52,9 @@ public class BTreeSet<V> implements NavigableSet<V>, List<V>
         return comparator;
     }
 
-    protected BTreeSearchIterator<V, V> slice(boolean forwards)
+    protected BTreeSearchIterator<V, V> slice(Dir dir)
     {
-        return BTree.slice(tree, comparator, forwards);
+        return BTree.slice(tree, comparator, dir);
     }
 
     public Object[] tree()
@@ -101,13 +103,13 @@ public class BTreeSet<V> implements NavigableSet<V>, List<V>
     @Override
     public BTreeSearchIterator<V, V> iterator()
     {
-        return slice(true);
+        return slice(Dir.ASC);
     }
 
     @Override
     public BTreeSearchIterator<V, V> descendingIterator()
     {
-        return slice(false);
+        return slice(Dir.DESC);
     }
 
     @Override
@@ -360,9 +362,9 @@ public class BTreeSet<V> implements NavigableSet<V>, List<V>
         }
 
         @Override
-        protected BTreeSearchIterator<V, V> slice(boolean forwards)
+        protected BTreeSearchIterator<V, V> slice(Dir dir)
         {
-            return new BTreeSearchIterator<>(tree, comparator, forwards, lowerBound, upperBound);
+            return new BTreeSearchIterator<>(tree, comparator, dir, lowerBound, upperBound);
         }
 
         @Override
@@ -485,9 +487,9 @@ public class BTreeSet<V> implements NavigableSet<V>, List<V>
         }
 
         @Override
-        protected BTreeSearchIterator<V, V> slice(boolean forwards)
+        protected BTreeSearchIterator<V, V> slice(Dir dir)
         {
-            return super.slice(!forwards);
+            return super.slice(dir.invert());
         }
 
         /* Flip the methods we call for inequality searches */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/utils/btree/NodeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/NodeBuilder.java b/src/java/org/apache/cassandra/utils/btree/NodeBuilder.java
index 83fc661..bba36c3 100644
--- a/src/java/org/apache/cassandra/utils/btree/NodeBuilder.java
+++ b/src/java/org/apache/cassandra/utils/btree/NodeBuilder.java
@@ -263,7 +263,8 @@ final class NodeBuilder
     // builds a new root BTree node - must be called on root of operation
     Object[] toNode()
     {
-        assert buildKeyPosition <= FAN_FACTOR && (buildKeyPosition > 0 || getKeyEnd(copyFrom) > 0) : buildKeyPosition;
+        // we permit building empty trees as some constructions do not know in advance how many items they will contain
+        assert buildKeyPosition <= FAN_FACTOR : buildKeyPosition;
         return buildFromRange(0, buildKeyPosition, isLeaf(copyFrom), false);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/utils/btree/TreeCursor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/TreeCursor.java b/src/java/org/apache/cassandra/utils/btree/TreeCursor.java
index 9e946f9..8523810 100644
--- a/src/java/org/apache/cassandra/utils/btree/TreeCursor.java
+++ b/src/java/org/apache/cassandra/utils/btree/TreeCursor.java
@@ -124,8 +124,8 @@ class TreeCursor<K> extends NodeCursor<K>
 
             int cmp;
             if (key == test) cmp = 0; // check object identity first, since we utilise that in some places and it's very cheap
-            else cmp = comparator.compare(key, test);
-            if (forwards ? cmp <= 0 : cmp >= 0)
+            else cmp = comparator.compare(test, key); // order of provision matters for asymmetric comparators
+            if (forwards ? cmp >= 0 : cmp <= 0)
             {
                 // we've either matched, or excluded the value from being present
                 int index = cur.globalIndex();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java b/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
index a470527..8fd470f 100644
--- a/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer;
 
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.Columns;
-import org.apache.cassandra.db.rows.ArrayBackedRow;
+import org.apache.cassandra.db.rows.BTreeBackedRow;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -51,13 +51,13 @@ public abstract class AbstractAllocator
         return new CloningArrayBackedRowBuilder(columns, this);
     }
 
-    private static class CloningArrayBackedRowBuilder extends ArrayBackedRow.SortedBuilder
+    private static class CloningArrayBackedRowBuilder extends BTreeBackedRow.Builder
     {
         private final AbstractAllocator allocator;
 
         private CloningArrayBackedRowBuilder(Columns columns, AbstractAllocator allocator)
         {
-            super(columns);
+            super(columns, true);
             this.allocator = allocator;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
----------------------------------------------------------------------
diff --git a/test/burn/org/apache/cassandra/utils/LongBTreeTest.java b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
index 60116b9..37866d2 100644
--- a/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
+++ b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
@@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 
-import com.google.common.collect.Iterators;
+import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -43,15 +43,21 @@ import com.codahale.metrics.Timer;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.utils.btree.*;
 
+import static com.google.common.base.Predicates.notNull;
 import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.transform;
 import static java.util.Comparator.naturalOrder;
 import static java.util.Comparator.reverseOrder;
+import static org.apache.cassandra.utils.btree.BTree.iterable;
+import static org.junit.Assert.assertTrue;
 
 public class LongBTreeTest
 {
 
     private static int perThreadTrees = 10000;
-    private static final boolean DEBUG = false;
+    private static int minTreeSize = 5;
+    private static int maxTreeSize = 15;
+    private static final boolean DEBUG = true;
     private static final MetricRegistry metrics = new MetricRegistry();
     private static final Timer BTREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "BTREE"));
     private static final Timer TREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "TREE"));
@@ -108,36 +114,36 @@ public class LongBTreeTest
     {
         final int perTreeSelections = 2;
         testRandomSelectionOfSet(perThreadTrees, perTreeSelections,
-         (test, canonical) -> {
-             if (!canonical.isEmpty() || !test.isEmpty())
-             {
-                 Assert.assertEquals(canonical.isEmpty(), test.isEmpty());
-                 Assert.assertEquals(canonical.first(), test.first());
-                 Assert.assertEquals(canonical.last(), test.last());
-             }
-             return (key) ->
-             {
-                 Assert.assertEquals(test.ceiling(key), canonical.ceiling(key));
-                 Assert.assertEquals(test.higher(key), canonical.higher(key));
-                 Assert.assertEquals(test.floor(key), canonical.floor(key));
-                 Assert.assertEquals(test.lower(key), canonical.lower(key));
-             };
-         });
+                                 (test, canonical) -> {
+                                     if (!canonical.isEmpty() || !test.isEmpty())
+                                     {
+                                         Assert.assertEquals(canonical.isEmpty(), test.isEmpty());
+                                         Assert.assertEquals(canonical.first(), test.first());
+                                         Assert.assertEquals(canonical.last(), test.last());
+                                     }
+                                     return (key) ->
+                                     {
+                                         Assert.assertEquals(test.ceiling(key), canonical.ceiling(key));
+                                         Assert.assertEquals(test.higher(key), canonical.higher(key));
+                                         Assert.assertEquals(test.floor(key), canonical.floor(key));
+                                         Assert.assertEquals(test.lower(key), canonical.lower(key));
+                                     };
+                                 });
     }
 
     @Test
     public void testListIndexes() throws InterruptedException
     {
         testRandomSelectionOfList(perThreadTrees, 4,
-          (test, canonical, cmp) ->
-          (key) ->
-          {
-              int javaIndex = Collections.binarySearch(canonical, key, cmp);
-              int btreeIndex = test.indexOf(key);
-              Assert.assertEquals(javaIndex, btreeIndex);
-              if (javaIndex >= 0)
-                  Assert.assertEquals(canonical.get(javaIndex), test.get(btreeIndex));
-          }
+                                  (test, canonical, cmp) ->
+                                  (key) ->
+                                  {
+                                      int javaIndex = Collections.binarySearch(canonical, key, cmp);
+                                      int btreeIndex = test.indexOf(key);
+                                      Assert.assertEquals(javaIndex, btreeIndex);
+                                      if (javaIndex >= 0)
+                                          Assert.assertEquals(canonical.get(javaIndex), test.get(btreeIndex));
+                                  }
         );
     }
 
@@ -145,14 +151,81 @@ public class LongBTreeTest
     public void testToArray() throws InterruptedException
     {
         testRandomSelection(perThreadTrees, 4,
-          (selection) ->
-          {
-              Integer[] array = new Integer[selection.canonicalList.size() + 1];
-              selection.testAsList.toArray(array, 1);
-              Assert.assertEquals(null, array[0]);
-              for (int j = 0 ; j < selection.canonicalList.size() ; j++)
-                  Assert.assertEquals(selection.canonicalList.get(j), array[j + 1]);
-          });
+                            (selection) ->
+                            {
+                                Integer[] array = new Integer[selection.canonicalList.size() + 1];
+                                selection.testAsList.toArray(array, 1);
+                                Assert.assertEquals(null, array[0]);
+                                for (int j = 0; j < selection.canonicalList.size(); j++)
+                                    Assert.assertEquals(selection.canonicalList.get(j), array[j + 1]);
+                            });
+    }
+
+    private static final class CountingFunction implements Function<Integer, Integer>
+    {
+        final Function<Integer, Integer> wrapped;
+        int count = 0;
+        protected CountingFunction(Function<Integer, Integer> wrapped)
+        {
+            this.wrapped = wrapped;
+        }
+        public Integer apply(Integer integer)
+        {
+            count++;
+            return wrapped.apply(integer);
+        }
+    }
+
+    @Test
+    public void testTransformAndFilter() throws InterruptedException
+    {
+        testRandomSelection(perThreadTrees, 4, false, false, false,
+                            (selection) ->
+                            {
+                                Map<Integer, Integer> update = new LinkedHashMap<>();
+                                for (Integer i : selection.testKeys)
+                                    update.put(i, new Integer(i));
+
+                                CountingFunction function;
+                                Object[] original = selection.testAsSet.tree();
+                                Object[] transformed;
+
+                                // test replacing none, leaving all present
+                                function = new CountingFunction((x) -> x);
+                                transformed = BTree.transformAndFilter(original, function);
+                                Assert.assertEquals(BTree.size(original), function.count);
+                                Assert.assertSame(original, transformed);
+
+                                // test replacing some, leaving all present
+                                function = new CountingFunction((x) -> update.containsKey(x) ? update.get(x) : x);
+                                transformed = BTree.transformAndFilter(original, function);
+                                Assert.assertEquals(BTree.size(original), function.count);
+                                assertSame(transform(selection.canonicalList, function.wrapped), iterable(transformed));
+
+                                // test replacing some, removing some
+                                function = new CountingFunction(update::get);
+                                transformed = BTree.transformAndFilter(original, function);
+                                Assert.assertEquals(BTree.size(original), function.count);
+                                assertSame(filter(transform(selection.canonicalList, function.wrapped), notNull()), iterable(transformed));
+
+                                // test replacing none, removing some
+                                function = new CountingFunction((x) -> update.containsKey(x) ? null : x);
+                                transformed = BTree.transformAndFilter(selection.testAsList.tree(), function);
+                                Assert.assertEquals(BTree.size(original), function.count);
+                                assertSame(filter(transform(selection.canonicalList, function.wrapped), notNull()), iterable(transformed));
+                            });
+    }
+
+    private static void assertSame(Iterable<Integer> i1, Iterable<Integer> i2)
+    {
+        assertSame(i1.iterator(), i2.iterator());
+    }
+
+    private static void assertSame(Iterator<Integer> i1, Iterator<Integer> i2)
+    {
+        while (i1.hasNext() && i2.hasNext())
+            Assert.assertSame(i1.next(), i2.next());
+        Assert.assertEquals(i1.hasNext(), i2.hasNext());
     }
 
     private void testRandomSelectionOfList(int perThreadTrees, int perTreeSelections, BTreeListTestFactory testRun) throws InterruptedException
@@ -195,8 +268,14 @@ public class LongBTreeTest
                 testEachKey.testOne(key);
         });
     }
+
     private void testRandomSelection(int perThreadTrees, int perTreeSelections, Consumer<RandomSelection> testRun) throws InterruptedException
     {
+        testRandomSelection(perThreadTrees, perTreeSelections, true, true, true, testRun);
+    }
+
+    private void testRandomSelection(int perThreadTrees, int perTreeSelections, boolean narrow, boolean mixInNotPresentItems, boolean permitReversal, Consumer<RandomSelection> testRun) throws InterruptedException
+    {
         int threads = Runtime.getRuntime().availableProcessors();
         final CountDownLatch latch = new CountDownLatch(threads);
         final AtomicLong errors = new AtomicLong();
@@ -212,10 +291,10 @@ public class LongBTreeTest
                     {
                         for (int i = 0 ; i < perThreadTrees ; i++)
                         {
-                            RandomTree tree = randomTree(100, 10000);
+                            RandomTree tree = randomTree(minTreeSize, maxTreeSize);
                             for (int j = 0 ; j < perTreeSelections ; j++)
                             {
-                                testRun.accept(tree.select());
+                                testRun.accept(tree.select(narrow, mixInNotPresentItems, permitReversal));
                                 count.incrementAndGet();
                             }
                         }
@@ -273,7 +352,7 @@ public class LongBTreeTest
             this.test = test;
         }
 
-        RandomSelection select()
+        RandomSelection select(boolean narrow, boolean mixInNotPresentItems, boolean permitReversal)
         {
             ThreadLocalRandom random = ThreadLocalRandom.current();
             NavigableSet<Integer> canonicalSet = this.canonical;
@@ -285,11 +364,11 @@ public class LongBTreeTest
             Assert.assertEquals(canonicalList.size(), testAsList.size());
 
             // sometimes select keys first, so we cover full range
-            List<Integer> allKeys = randomKeys(canonical);
+            List<Integer> allKeys = randomKeys(canonical, mixInNotPresentItems);
             List<Integer> keys = allKeys;
 
-            int narrow = random.nextInt(3);
-            while (canonicalList.size() > 10 && keys.size() > 10 && narrow-- > 0)
+            int narrowCount = random.nextInt(3);
+            while (narrow && canonicalList.size() > 10 && keys.size() > 10 && narrowCount-- > 0)
             {
                 boolean useLb = random.nextBoolean();
                 boolean useUb = random.nextBoolean();
@@ -297,7 +376,7 @@ public class LongBTreeTest
                     continue;
 
                 // select a range smaller than the total span when we have more narrowing iterations left
-                int indexRange = keys.size() / (narrow + 1);
+                int indexRange = keys.size() / (narrowCount + 1);
 
                 boolean lbInclusive = true;
                 Integer lbKey = canonicalList.get(0);
@@ -354,7 +433,7 @@ public class LongBTreeTest
                 keys = allKeys;
 
             Comparator<Integer> comparator = naturalOrder();
-            if (random.nextBoolean())
+            if (permitReversal && random.nextBoolean())
             {
                 if (allKeys != keys)
                     keys = new ArrayList<>(keys);
@@ -478,10 +557,10 @@ public class LongBTreeTest
 
     // select a random subset of the keys, with an optional random population of keys inbetween those that are present
     // return a value with the search position
-    private static List<Integer> randomKeys(Iterable<Integer> canonical)
+    private static List<Integer> randomKeys(Iterable<Integer> canonical, boolean mixInNotPresentItems)
     {
         ThreadLocalRandom rnd = ThreadLocalRandom.current();
-        boolean useFake = rnd.nextBoolean();
+        boolean useFake = mixInNotPresentItems && rnd.nextBoolean();
         final float fakeRatio = rnd.nextFloat();
         List<Integer> results = new ArrayList<>();
         Long fakeLb = null, fakeUb = null;
@@ -525,8 +604,8 @@ public class LongBTreeTest
         btree = BTree.update(btree, naturalOrder(), canon, UpdateFunction.<Integer>noOp());
         canon.add(Integer.MIN_VALUE);
         canon.add(Integer.MAX_VALUE);
-        Assert.assertTrue(BTree.isWellFormed(btree, naturalOrder()));
-        testEqual("Oversize", BTree.<Integer>slice(btree, naturalOrder(), true), canon.iterator());
+        assertTrue(BTree.isWellFormed(btree, naturalOrder()));
+        testEqual("Oversize", BTree.iterator(btree), canon.iterator());
     }
 
     @Test
@@ -652,7 +731,7 @@ public class LongBTreeTest
                         throw new AssertionError("Not well formed!");
                     }
                     if (quickEquality)
-                        testEqual("", BTree.<Integer>slice(btree, naturalOrder(), true), canon.keySet().iterator());
+                        testEqual("", BTree.iterator(btree), canon.keySet().iterator());
                     else
                         r.addAll(testAllSlices("RND", btree, new TreeSet<>(canon.keySet())));
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/test/unit/org/apache/cassandra/db/RowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowTest.java b/test/unit/org/apache/cassandra/db/RowTest.java
index 9a97483..373cf6a 100644
--- a/test/unit/org/apache/cassandra/db/RowTest.java
+++ b/test/unit/org/apache/cassandra/db/RowTest.java
@@ -127,7 +127,7 @@ public class RowTest
         ColumnDefinition defA = cfm.getColumnDefinition(new ColumnIdentifier("a", true));
         ColumnDefinition defB = cfm.getColumnDefinition(new ColumnIdentifier("b", true));
 
-        Row.Builder builder = ArrayBackedRow.unsortedBuilder(cfm.partitionColumns().regulars, nowInSeconds);
+        Row.Builder builder = BTreeBackedRow.unsortedBuilder(cfm.partitionColumns().regulars, nowInSeconds);
         builder.newRow(cfm.comparator.make("c1"));
         writeSimpleCellValue(builder, cfm, defA, "a1", 0);
         writeSimpleCellValue(builder, cfm, defA, "a2", 1);
@@ -152,7 +152,7 @@ public class RowTest
 
         Cell cell = BufferCell.expiring(def, 0, ttl, nowInSeconds, ((AbstractType) def.cellValueType()).decompose("a1"));
 
-        PartitionUpdate update = PartitionUpdate.singleRowUpdate(cfm, dk, ArrayBackedRow.singleCellRow(cfm.comparator.make("c1"), cell));
+        PartitionUpdate update = PartitionUpdate.singleRowUpdate(cfm, dk, BTreeBackedRow.singleCellRow(cfm.comparator.make("c1"), cell));
         new Mutation(update).applyUnsafe();
 
         // when we read with a nowInSeconds before the cell has expired,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java b/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
index e6d8cb0..54bb344 100644
--- a/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
+++ b/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
@@ -364,7 +364,7 @@ public class RowAndDeletionMergeIteratorTest
 
     private void addRow(PartitionUpdate update, int col1, int a)
     {
-        update.add(ArrayBackedRow.singleCellRow(update.metadata().comparator.make(col1), makeCell(cfm, defA, a, 0)));
+        update.add(BTreeBackedRow.singleCellRow(update.metadata().comparator.make(col1), makeCell(cfm, defA, a, 0)));
     }
 
     private Cell makeCell(CFMetaData cfm, ColumnDefinition columnDefinition, int value, long timestamp)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
index fd775b4..c16365a 100644
--- a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
+++ b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
@@ -371,7 +371,7 @@ public class UnfilteredRowIteratorsMergeTest
     {
         final Clustering clustering = clusteringFor(pos);
         final LivenessInfo live = LivenessInfo.create(metadata, timeGenerator.apply(pos), nowInSec);
-        return ArrayBackedRow.noCellLiveRow(clustering, live);
+        return BTreeBackedRow.noCellLiveRow(clustering, live);
     }
 
     private void dumpList(List<Unfiltered> list)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java b/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
index df4b08a..316a23c 100644
--- a/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
+++ b/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
@@ -268,7 +268,7 @@ public class TriggerExecutorTest
 
     private static PartitionUpdate makeCf(CFMetaData metadata, String key, String columnValue1, String columnValue2)
     {
-        Row.Builder builder = ArrayBackedRow.unsortedBuilder(metadata.partitionColumns().regulars, FBUtilities.nowInSeconds());
+        Row.Builder builder = BTreeBackedRow.unsortedBuilder(metadata.partitionColumns().regulars, FBUtilities.nowInSeconds());
         builder.newRow(Clustering.EMPTY);
         long ts = FBUtilities.timestampMicros();
         if (columnValue1 != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/test/unit/org/apache/cassandra/utils/BTreeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/BTreeTest.java b/test/unit/org/apache/cassandra/utils/BTreeTest.java
index c004836..ec4c359 100644
--- a/test/unit/org/apache/cassandra/utils/BTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/BTreeTest.java
@@ -20,8 +20,10 @@ package org.apache.cassandra.utils;
 import java.util.*;
 import java.util.concurrent.ThreadLocalRandom;
 
+import com.google.common.collect.Iterables;
 import org.junit.Test;
 
+import junit.framework.Assert;
 import org.apache.cassandra.utils.btree.BTree;
 import org.apache.cassandra.utils.btree.UpdateFunction;
 
@@ -191,9 +193,80 @@ public class BTreeTest
         assertEquals(1, monitor.getNumberOfCalls(3));
     }
 
+    /**
+     * Tests that the apply method of the <code>UpdateFunction</code> is only called once per value with each build call.
+     */
+    @Test
+    public void testBuilder_Resolver()
+    {
+        // for numbers x in 1..N, we repeat x x times, and resolve values to their sum,
+        // so that the resulting tree is of square numbers
+        BTree.Builder.Resolver resolver = (array, lb, ub) -> {
+            int sum = 0;
+            for (int i = lb ; i < ub ; i++)
+                sum += (Integer) array[i];
+            return sum;
+        };
+
+        for (int count = 0 ; count < 10 ; count ++)
+        {
+            BTree.Builder<Integer> builder;
+            // first check we produce the right output for sorted input
+            List<Integer> sorted = resolverInput(count, false);
+            builder = BTree.builder(Comparator.naturalOrder());
+            builder.auto(false);
+            for (Integer i : sorted)
+                builder.add(i);
+            // for sorted input, check non-resolve path works before checking resolution path
+            Assert.assertTrue(Iterables.elementsEqual(sorted, BTree.iterable(builder.build())));
+            checkResolverOutput(count, builder.resolve(resolver).build());
+            builder = BTree.builder(Comparator.naturalOrder());
+            builder.auto(false);
+            for (int i = 0 ; i < 10 ; i++)
+            {
+                // now do a few runs of randomized inputs
+                for (Integer j : resolverInput(count, true))
+                    builder.add(j);
+                checkResolverOutput(count, builder.sort().resolve(resolver).build());
+                builder.reuse();
+            }
+        }
+    }
+
+    private static List<Integer> resolverInput(int count, boolean shuffled)
+    {
+        List<Integer> result = new ArrayList<>();
+        for (int i = 1 ; i <= count ; i++)
+            for (int j = 0 ; j < i ; j++)
+                result.add(i);
+        if (shuffled)
+        {
+            ThreadLocalRandom random = ThreadLocalRandom.current();
+            for (int i = 0 ; i < result.size() ; i++)
+            {
+                int swapWith = random.nextInt(i, result.size());
+                Integer t = result.get(swapWith);
+                result.set(swapWith, result.get(i));
+                result.set(i, t);
+            }
+        }
+        return result;
+    }
+
+    private static void checkResolverOutput(int count, Object[] btree)
+    {
+        int i = 1;
+        for (Integer current : BTree.<Integer>iterable(btree))
+        {
+            Assert.assertEquals(i * i, current.intValue());
+            i++;
+        }
+        Assert.assertEquals(i, count + 1);
+    }
+
     private static void checkResult(int count, Object[] btree)
     {
-        Iterator<Integer> iter = BTree.slice(btree, CMP, true);
+        Iterator<Integer> iter = BTree.slice(btree, CMP, BTree.Dir.ASC);
         int i = 0;
         while (iter.hasNext())
             assertEquals(iter.next(), ints[i++]);


[2/2] cassandra git commit: Use BTree to back default Row and ComplexColumnData objects

Posted by be...@apache.org.
Use BTree to back default Row and ComplexColumnData objects

patch by benedict; reviewed by branimir for CASSANDRA-9888


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/639d4b24
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/639d4b24
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/639d4b24

Branch: refs/heads/trunk
Commit: 639d4b240c084900b6589222a0984babfc1890b1
Parents: de420e5
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Jul 23 14:22:16 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Tue Jul 28 15:00:27 2015 +0100

----------------------------------------------------------------------
 .../cassandra/config/ColumnDefinition.java      |  16 +
 .../apache/cassandra/cql3/UpdateParameters.java |   4 +-
 .../cassandra/db/HintedHandOffManager.java      |   4 +-
 .../org/apache/cassandra/db/LegacyLayout.java   |   4 +-
 .../apache/cassandra/db/RowUpdateBuilder.java   |   6 +-
 .../cassandra/db/UnfilteredDeserializer.java    |   2 +-
 .../AbstractSimplePerColumnSecondaryIndex.java  |   4 +-
 .../db/index/composites/CompositesIndex.java    |   2 +-
 .../AbstractThreadUnsafePartition.java          |   2 +-
 .../db/partitions/AtomicBTreePartition.java     |   7 +-
 .../db/partitions/PartitionUpdate.java          |   4 +-
 .../apache/cassandra/db/rows/AbstractCell.java  |   8 +-
 .../cassandra/db/rows/ArrayBackedRow.java       | 927 -------------------
 .../cassandra/db/rows/BTreeBackedRow.java       | 535 +++++++++++
 .../apache/cassandra/db/rows/BufferCell.java    |   9 +-
 src/java/org/apache/cassandra/db/rows/Cell.java |  35 +-
 .../apache/cassandra/db/rows/ColumnData.java    |  25 +-
 .../cassandra/db/rows/ComplexColumnData.java    | 208 ++---
 src/java/org/apache/cassandra/db/rows/Row.java  |   6 +-
 src/java/org/apache/cassandra/db/rows/Rows.java |   4 +-
 .../rows/UnfilteredRowIteratorSerializer.java   |   2 +-
 .../db/rows/UnfilteredRowIterators.java         |   2 +-
 .../cassandra/db/rows/UnfilteredSerializer.java |   2 +-
 .../io/sstable/SSTableSimpleIterator.java       |   2 +-
 .../apache/cassandra/service/DataResolver.java  |   2 +-
 .../cassandra/thrift/CassandraServer.java       |   9 +-
 .../cassandra/thrift/ThriftResultsMerger.java   |   4 +-
 .../org/apache/cassandra/utils/btree/BTree.java | 320 ++++++-
 .../utils/btree/BTreeSearchIterator.java        |   8 +-
 .../apache/cassandra/utils/btree/BTreeSet.java  |  18 +-
 .../cassandra/utils/btree/NodeBuilder.java      |   3 +-
 .../cassandra/utils/btree/TreeCursor.java       |   4 +-
 .../utils/memory/AbstractAllocator.java         |   6 +-
 .../apache/cassandra/utils/LongBTreeTest.java   | 173 +++-
 test/unit/org/apache/cassandra/db/RowTest.java  |   4 +-
 .../rows/RowAndDeletionMergeIteratorTest.java   |   2 +-
 .../rows/UnfilteredRowIteratorsMergeTest.java   |   2 +-
 .../cassandra/triggers/TriggerExecutorTest.java |   2 +-
 .../org/apache/cassandra/utils/BTreeTest.java   |  75 +-
 39 files changed, 1200 insertions(+), 1252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index 54a00f5..8d8a929 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -33,6 +33,8 @@ import org.apache.cassandra.exceptions.*;
 
 public class ColumnDefinition extends ColumnSpecification implements Comparable<ColumnDefinition>
 {
+    public static final Comparator<Object> asymmetricColumnDataComparator = (a, b) -> ((ColumnData) a).column().compareTo((ColumnDefinition) b);
+
     /*
      * The type of CQL3 column this definition represents.
      * There is 4 main type of CQL3 columns: those parts of the partition key,
@@ -70,6 +72,8 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable<
     private final Integer componentIndex;
 
     private final Comparator<CellPath> cellPathComparator;
+    private final Comparator<Object> asymmetricCellPathComparator;
+    private final Comparator<? super Cell> cellComparator;
 
     /**
      * These objects are compared frequently, so we encode several of their comparison components
@@ -156,6 +160,8 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable<
         this.componentIndex = componentIndex;
         this.setIndexType(indexType, indexOptions);
         this.cellPathComparator = makeCellPathComparator(kind, validator);
+        this.cellComparator = cellPathComparator == null ? ColumnData.comparator : (a, b) -> cellPathComparator.compare(a.path(), b.path());
+        this.asymmetricCellPathComparator = cellPathComparator == null ? null : (a, b) -> cellPathComparator.compare(((Cell)a).path(), (CellPath) b);
         this.comparisonOrder = comparisonOrder(kind, isComplex(), position());
     }
 
@@ -407,6 +413,16 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable<
         return cellPathComparator;
     }
 
+    public Comparator<Object> asymmetricCellPathComparator()
+    {
+        return asymmetricCellPathComparator;
+    }
+
+    public Comparator<? super Cell> cellComparator()
+    {
+        return cellComparator;
+    }
+
     public boolean isComplex()
     {
         return cellPathComparator != null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/cql3/UpdateParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index 8dcb7e5..519eb4b 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -120,13 +120,13 @@ public class UpdateParameters
         if (clustering == Clustering.STATIC_CLUSTERING)
         {
             if (staticBuilder == null)
-                staticBuilder = ArrayBackedRow.unsortedBuilder(updatedColumns.statics, nowInSec);
+                staticBuilder = BTreeBackedRow.unsortedBuilder(updatedColumns.statics, nowInSec);
             builder = staticBuilder;
         }
         else
         {
             if (regularBuilder == null)
-                regularBuilder = ArrayBackedRow.unsortedBuilder(updatedColumns.regulars, nowInSec);
+                regularBuilder = BTreeBackedRow.unsortedBuilder(updatedColumns.regulars, nowInSec);
             builder = regularBuilder;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 234ab97..6ff880c 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -137,7 +137,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, Mutation.serializer, MessagingService.current_version));
         Cell cell = BufferCell.expiring(hintColumn, now, ttl, FBUtilities.nowInSeconds(), value);
 
-        return new Mutation(PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, key, ArrayBackedRow.singleCellRow(clustering, cell)));
+        return new Mutation(PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, key, BTreeBackedRow.singleCellRow(clustering, cell)));
     }
 
     /*
@@ -181,7 +181,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
     {
         DecoratedKey dk =  StorageService.getPartitioner().decorateKey(tokenBytes);
         Cell cell = BufferCell.tombstone(hintColumn, timestamp, FBUtilities.nowInSeconds());
-        PartitionUpdate upd = PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, dk, ArrayBackedRow.singleCellRow(clustering, cell));
+        PartitionUpdate upd = PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, dk, BTreeBackedRow.singleCellRow(clustering, cell));
         new Mutation(upd).applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 501dbb2..696c1c9 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -353,7 +353,7 @@ public abstract class LegacyLayout
         for (ColumnDefinition column : statics)
             columnsToFetch.add(column.name.bytes);
 
-        Row.Builder builder = ArrayBackedRow.unsortedBuilder(statics, FBUtilities.nowInSeconds());
+        Row.Builder builder = BTreeBackedRow.unsortedBuilder(statics, FBUtilities.nowInSeconds());
         builder.newRow(Clustering.STATIC_CLUSTERING);
 
         boolean foundOne = false;
@@ -822,7 +822,7 @@ public abstract class LegacyLayout
             this.metadata = metadata;
             this.isStatic = isStatic;
             this.helper = helper;
-            this.builder = ArrayBackedRow.sortedBuilder(isStatic ? metadata.partitionColumns().statics : metadata.partitionColumns().regulars);
+            this.builder = BTreeBackedRow.sortedBuilder(isStatic ? metadata.partitionColumns().statics : metadata.partitionColumns().regulars);
         }
 
         public static CellGrouper staticGrouper(CFMetaData metadata, SerializationHelper helper)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
index 71b7bd8..c06a7f7 100644
--- a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
+++ b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
@@ -80,7 +80,7 @@ public class RowUpdateBuilder
         assert staticBuilder == null : "Cannot update both static and non-static columns with the same RowUpdateBuilder object";
         assert regularBuilder == null : "Cannot add the clustering twice to the same row";
 
-        regularBuilder = ArrayBackedRow.unsortedBuilder(update.columns().regulars, FBUtilities.nowInSeconds());
+        regularBuilder = BTreeBackedRow.unsortedBuilder(update.columns().regulars, FBUtilities.nowInSeconds());
         regularBuilder.newRow(clustering);
 
         // If a CQL table, add the "row marker"
@@ -105,7 +105,7 @@ public class RowUpdateBuilder
         assert regularBuilder == null : "Cannot update both static and non-static columns with the same RowUpdateBuilder object";
         if (staticBuilder == null)
         {
-            staticBuilder = ArrayBackedRow.unsortedBuilder(update.columns().statics, FBUtilities.nowInSeconds());
+            staticBuilder = BTreeBackedRow.unsortedBuilder(update.columns().statics, FBUtilities.nowInSeconds());
             staticBuilder.newRow(Clustering.STATIC_CLUSTERING);
         }
         return staticBuilder;
@@ -186,7 +186,7 @@ public class RowUpdateBuilder
         assert clusteringValues.length == update.metadata().comparator.size() || (clusteringValues.length == 0 && !update.columns().statics.isEmpty());
 
         boolean isStatic = clusteringValues.length != update.metadata().comparator.size();
-        Row.Builder builder = ArrayBackedRow.sortedBuilder(isStatic ? update.columns().statics : update.columns().regulars);
+        Row.Builder builder = BTreeBackedRow.sortedBuilder(isStatic ? update.columns().statics : update.columns().regulars);
 
         if (isStatic)
             builder.newRow(Clustering.STATIC_CLUSTERING);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
index 36a372f..c00597a 100644
--- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
+++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
@@ -121,7 +121,7 @@ public abstract class UnfilteredDeserializer
             super(metadata, in, helper);
             this.header = header;
             this.clusteringDeserializer = new ClusteringPrefix.Deserializer(metadata.comparator, in, header);
-            this.builder = ArrayBackedRow.sortedBuilder(helper.fetchedRegularColumns(header));
+            this.builder = BTreeBackedRow.sortedBuilder(helper.fetchedRegularColumns(header));
         }
 
         public boolean hasNext() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index c3a3c08..842cbb9 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -115,7 +115,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
     {
         DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, cellValue, path));
 
-        Row row = ArrayBackedRow.emptyDeletedRow(makeIndexClustering(rowKey, clustering, path), deletion);
+        Row row = BTreeBackedRow.emptyDeletedRow(makeIndexClustering(rowKey, clustering, path), deletion);
         PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
 
         indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null);
@@ -132,7 +132,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
     {
         DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, cell));
 
-        Row row = ArrayBackedRow.noCellLiveRow(makeIndexClustering(rowKey, clustering, cell), info);
+        Row row = BTreeBackedRow.noCellLiveRow(makeIndexClustering(rowKey, clustering, cell), info);
         PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
 
         if (logger.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index e073802..42861c5 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -112,7 +112,7 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
 
     public void delete(IndexedEntry entry, OpOrder.Group opGroup, int nowInSec)
     {
-        Row row = ArrayBackedRow.emptyDeletedRow(entry.indexClustering, new DeletionTime(entry.timestamp, nowInSec));
+        Row row = BTreeBackedRow.emptyDeletedRow(entry.indexClustering, new DeletionTime(entry.timestamp, nowInSec));
         PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, entry.indexValue, row);
         indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
index d79ab06..a716768 100644
--- a/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
@@ -167,7 +167,7 @@ public abstract class AbstractThreadUnsafePartition implements Partition, Iterab
                     activeDeletion = rt.deletionTime();
 
                 if (row == null)
-                    return activeDeletion.isLive() ? null : ArrayBackedRow.emptyDeletedRow(clustering, activeDeletion);
+                    return activeDeletion.isLive() ? null : BTreeBackedRow.emptyDeletedRow(clustering, activeDeletion);
 
                 return row.filter(columns, activeDeletion, true, metadata);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index c06ffd5..e8ec4c0 100644
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.utils.memory.MemtableAllocator;
 import org.apache.cassandra.utils.memory.HeapAllocator;
 import org.apache.cassandra.service.StorageService;
 import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
+import static org.apache.cassandra.utils.btree.BTree.Dir.desc;
 
 /**
  * A thread-safe and atomic Partition implementation.
@@ -164,7 +165,7 @@ public class AtomicBTreePartition implements Partition
         final Holder current = ref;
         return new SearchIterator<Clustering, Row>()
         {
-            private final SearchIterator<Clustering, Row> rawIter = new BTreeSearchIterator<>(current.tree, metadata.comparator, !reversed);
+            private final SearchIterator<Clustering, Row> rawIter = new BTreeSearchIterator<>(current.tree, metadata.comparator, desc(reversed));
             private final DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion();
 
             public boolean hasNext()
@@ -188,7 +189,7 @@ public class AtomicBTreePartition implements Partition
                     activeDeletion = rt.deletionTime();
 
                 if (row == null)
-                    return activeDeletion.isLive() ? null : ArrayBackedRow.emptyDeletedRow(clustering, activeDeletion);
+                    return activeDeletion.isLive() ? null : BTreeBackedRow.emptyDeletedRow(clustering, activeDeletion);
 
                 return row.filter(columns, activeDeletion, true, metadata);
             }
@@ -235,7 +236,7 @@ public class AtomicBTreePartition implements Partition
     {
         Slice.Bound start = slice.start() == Slice.Bound.BOTTOM ? null : slice.start();
         Slice.Bound end = slice.end() == Slice.Bound.TOP ? null : slice.end();
-        Iterator<Row> rowIter = BTree.slice(current.tree, metadata.comparator, start, true, end, true, !reversed);
+        Iterator<Row> rowIter = BTree.slice(current.tree, metadata.comparator, start, true, end, true, desc(reversed));
 
         return new RowAndDeletionMergeIterator(metadata,
                                                partitionKey,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 689b832..102008f 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -822,8 +822,8 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
         {
             // This is a bit of a giant hack as this is the only place where we mutate a Row object. This makes it more efficient
             // for counters however and this won't be needed post-#6506 so that's probably fine.
-            assert row instanceof ArrayBackedRow;
-            ((ArrayBackedRow)row).setValue(column, path, value);
+            assert row instanceof BTreeBackedRow;
+            ((BTreeBackedRow)row).setValue(column, path, value);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/AbstractCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
index 807741a..f53322a 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.rows;
 import java.security.MessageDigest;
 import java.util.Objects;
 
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CollectionType;
@@ -32,8 +33,13 @@ import org.apache.cassandra.utils.FBUtilities;
  * Unless you have a very good reason not to, every cell implementation
  * should probably extend this class.
  */
-public abstract class AbstractCell implements Cell
+public abstract class AbstractCell extends Cell
 {
+    protected AbstractCell(ColumnDefinition column)
+    {
+        super(column);
+    }
+
     public void digest(MessageDigest digest)
     {
         digest.update(value().duplicate());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/ArrayBackedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ArrayBackedRow.java b/src/java/org/apache/cassandra/db/rows/ArrayBackedRow.java
deleted file mode 100644
index 12b23e1..0000000
--- a/src/java/org/apache/cassandra/db/rows/ArrayBackedRow.java
+++ /dev/null
@@ -1,927 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.function.Predicate;
-
-import com.google.common.collect.AbstractIterator;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.utils.SearchIterator;
-import org.apache.cassandra.utils.ObjectSizes;
-
-/**
- * Immutable implementation of a Row object.
- */
-public class ArrayBackedRow extends AbstractRow
-{
-    private static final ColumnData[] NO_DATA = new ColumnData[0];
-
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new ArrayBackedRow(Clustering.EMPTY, Columns.NONE, LivenessInfo.EMPTY, DeletionTime.LIVE, 0, NO_DATA, Integer.MAX_VALUE));
-
-    private final Clustering clustering;
-    private final Columns columns;
-    private final LivenessInfo primaryKeyLivenessInfo;
-    private final DeletionTime deletion;
-
-    // The data for each columns present in this row in column sorted order.
-    private final int size;
-    private final ColumnData[] data;
-
-    // We need to filter the tombstones of a row on every read (twice in fact: first to remove purgeable tombstone, and then after reconciliation to remove
-    // all tombstone since we don't return them to the client) as well as on compaction. But it's likely that many rows won't have any tombstone at all, so
-    // we want to speed up that case by not having to iterate/copy the row in this case. We could keep a single boolean telling us if we have tombstones,
-    // but that doesn't work for expiring columns. So instead we keep the deletion time for the first thing in the row to be deleted. This allow at any given
-    // time to know if we have any deleted information or not. If we any "true" tombstone (i.e. not an expiring cell), this value will be forced to
-    // Integer.MIN_VALUE, but if we don't and have expiring cells, this will the time at which the first expiring cell expires. If we have no tombstones and
-    // no expiring cells, this will be Integer.MAX_VALUE;
-    private final int minLocalDeletionTime;
-
-    private ArrayBackedRow(Clustering clustering, Columns columns, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, int size, ColumnData[] data, int minLocalDeletionTime)
-    {
-        this.clustering = clustering;
-        this.columns = columns;
-        this.primaryKeyLivenessInfo = primaryKeyLivenessInfo;
-        this.deletion = deletion;
-        this.size = size;
-        this.data = data;
-        this.minLocalDeletionTime = minLocalDeletionTime;
-    }
-
-    // Note that it's often easier/safer to use the sortedBuilder/unsortedBuilder or one of the static creation method below. Only directly useful in a small amount of cases.
-    public static ArrayBackedRow create(Clustering clustering, Columns columns, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, int size, ColumnData[] data)
-    {
-        int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion));
-        if (minDeletionTime != Integer.MIN_VALUE)
-        {
-            for (int i = 0; i < size; i++)
-                minDeletionTime = Math.min(minDeletionTime, minDeletionTime(data[i]));
-        }
-
-        return new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, size, data, minDeletionTime);
-    }
-
-    public static ArrayBackedRow emptyRow(Clustering clustering)
-    {
-        return new ArrayBackedRow(clustering, Columns.NONE, LivenessInfo.EMPTY, DeletionTime.LIVE, 0, NO_DATA, Integer.MAX_VALUE);
-    }
-
-    public static ArrayBackedRow singleCellRow(Clustering clustering, Cell cell)
-    {
-        if (cell.column().isSimple())
-            return new ArrayBackedRow(clustering, Columns.of(cell.column()), LivenessInfo.EMPTY, DeletionTime.LIVE, 1, new ColumnData[]{ cell }, minDeletionTime(cell));
-
-        ComplexColumnData complexData = new ComplexColumnData(cell.column(), new Cell[]{ cell }, DeletionTime.LIVE);
-        return new ArrayBackedRow(clustering, Columns.of(cell.column()), LivenessInfo.EMPTY, DeletionTime.LIVE, 1, new ColumnData[]{ complexData }, minDeletionTime(cell));
-    }
-
-    public static ArrayBackedRow emptyDeletedRow(Clustering clustering, DeletionTime deletion)
-    {
-        assert !deletion.isLive();
-        return new ArrayBackedRow(clustering, Columns.NONE, LivenessInfo.EMPTY, deletion, 0, NO_DATA, Integer.MIN_VALUE);
-    }
-
-    public static ArrayBackedRow noCellLiveRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo)
-    {
-        assert !primaryKeyLivenessInfo.isEmpty();
-        return new ArrayBackedRow(clustering, Columns.NONE, primaryKeyLivenessInfo, DeletionTime.LIVE, 0, NO_DATA, minDeletionTime(primaryKeyLivenessInfo));
-    }
-
-    private static int minDeletionTime(Cell cell)
-    {
-        return cell.isTombstone() ? Integer.MIN_VALUE : cell.localDeletionTime();
-    }
-
-    private static int minDeletionTime(LivenessInfo info)
-    {
-        return info.isExpiring() ? info.localExpirationTime() : Integer.MAX_VALUE;
-    }
-
-    private static int minDeletionTime(DeletionTime dt)
-    {
-        return dt.isLive() ? Integer.MAX_VALUE : Integer.MIN_VALUE;
-    }
-
-    private static int minDeletionTime(ComplexColumnData cd)
-    {
-        int min = minDeletionTime(cd.complexDeletion());
-        for (Cell cell : cd)
-            min = Math.min(min, minDeletionTime(cell));
-        return min;
-    }
-
-    private static int minDeletionTime(ColumnData cd)
-    {
-        return cd.column().isSimple() ? minDeletionTime((Cell)cd) : minDeletionTime((ComplexColumnData)cd);
-    }
-
-    public Clustering clustering()
-    {
-        return clustering;
-    }
-
-    public Columns columns()
-    {
-        return columns;
-    }
-
-    public LivenessInfo primaryKeyLivenessInfo()
-    {
-        return primaryKeyLivenessInfo;
-    }
-
-    public DeletionTime deletion()
-    {
-        return deletion;
-    }
-
-    public Cell getCell(ColumnDefinition c)
-    {
-        assert !c.isComplex();
-        int idx = binarySearch(c);
-        return idx < 0 ? null : (Cell)data[idx];
-    }
-
-    public Cell getCell(ColumnDefinition c, CellPath path)
-    {
-        assert c.isComplex();
-        int idx = binarySearch(c);
-        if (idx < 0)
-            return null;
-
-        return ((ComplexColumnData)data[idx]).getCell(path);
-    }
-
-    public ComplexColumnData getComplexColumnData(ColumnDefinition c)
-    {
-        assert c.isComplex();
-        int idx = binarySearch(c);
-        return idx < 0 ? null : (ComplexColumnData)data[idx];
-    }
-
-    public Iterator<ColumnData> iterator()
-    {
-        return new ColumnDataIterator();
-    }
-
-    public Iterable<Cell> cells()
-    {
-        return CellIterator::new;
-    }
-
-    public SearchIterator<ColumnDefinition, ColumnData> searchIterator()
-    {
-        return new ColumnSearchIterator();
-    }
-
-    public Row filter(ColumnFilter filter, CFMetaData metadata)
-    {
-        return filter(filter, DeletionTime.LIVE, false, metadata);
-    }
-
-    public Row filter(ColumnFilter filter, DeletionTime activeDeletion, boolean setActiveDeletionToRow, CFMetaData metadata)
-    {
-        Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = metadata.getDroppedColumns();
-
-        if (filter.includesAllColumns() && (activeDeletion.isLive() || deletion.supersedes(activeDeletion)) && droppedColumns.isEmpty())
-            return this;
-
-        boolean mayHaveShadowed = activeDeletion.supersedes(deletion);
-
-        LivenessInfo newInfo = primaryKeyLivenessInfo;
-        DeletionTime newDeletion = deletion;
-        if (mayHaveShadowed)
-        {
-            if (activeDeletion.deletes(newInfo.timestamp()))
-                newInfo = LivenessInfo.EMPTY;
-            // note that mayHaveShadowed means the activeDeletion shadows the row deletion. So if don't have setActiveDeletionToRow,
-            // the row deletion is shadowed and we shouldn't return it.
-            newDeletion = setActiveDeletionToRow ? activeDeletion : DeletionTime.LIVE;
-        }
-
-        ColumnData[] newData = new ColumnData[size];
-        int newMinDeletionTime = Math.min(minDeletionTime(newInfo), minDeletionTime(newDeletion));
-        Columns columns = filter.fetchedColumns().columns(isStatic());
-        Predicate<ColumnDefinition> inclusionTester = columns.inOrderInclusionTester();
-        int newSize = 0;
-        for (int i = 0; i < size; i++)
-        {
-            ColumnData cd = data[i];
-            ColumnDefinition column = cd.column();
-            if (!inclusionTester.test(column))
-                continue;
-
-            CFMetaData.DroppedColumn dropped = droppedColumns.get(column.name.bytes);
-            if (column.isSimple())
-            {
-                Cell cell = (Cell)cd;
-                if ((dropped == null || cell.timestamp() > dropped.droppedTime) && !(mayHaveShadowed && activeDeletion.deletes(cell)))
-                {
-                    newData[newSize++] = cell;
-                    newMinDeletionTime = Math.min(newMinDeletionTime, minDeletionTime(cell));
-                }
-            }
-            else
-            {
-                ColumnData newCd = ((ComplexColumnData)cd).filter(filter, mayHaveShadowed ? activeDeletion : DeletionTime.LIVE, dropped);
-                if (newCd != null)
-                {
-                    newData[newSize++] = newCd;
-                    newMinDeletionTime = Math.min(newMinDeletionTime, minDeletionTime(newCd));
-                }
-            }
-        }
-
-        if (newSize == 0 && newInfo.isEmpty() && newDeletion.isLive())
-            return null;
-
-        return new ArrayBackedRow(clustering, columns, newInfo, newDeletion, newSize, newData, newMinDeletionTime);
-    }
-
-    public boolean hasComplexDeletion()
-    {
-        // We start by the end cause we know complex columns sort before simple ones
-        for (int i = size - 1; i >= 0; i--)
-        {
-            ColumnData cd = data[i];
-            if (cd.column().isSimple())
-                return false;
-
-            if (!((ComplexColumnData)cd).complexDeletion().isLive())
-                return true;
-        }
-        return false;
-    }
-
-    public Row markCounterLocalToBeCleared()
-    {
-        ColumnData[] newData = null;
-        for (int i = 0; i < size; i++)
-        {
-            ColumnData cd = data[i];
-            ColumnData newCd = cd.column().cellValueType().isCounter()
-                             ? cd.markCounterLocalToBeCleared()
-                             : cd;
-            if (newCd != cd)
-            {
-                if (newData == null)
-                    newData = Arrays.copyOf(data, size);
-                newData[i] = newCd;
-            }
-        }
-
-        return newData == null
-             ? this
-             : new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, size, newData, minLocalDeletionTime);
-    }
-
-    public boolean hasDeletion(int nowInSec)
-    {
-        return nowInSec >= minLocalDeletionTime;
-    }
-
-    /**
-     * Returns a copy of the row where all timestamps for live data have replaced by {@code newTimestamp} and
-     * all deletion timestamp by {@code newTimestamp - 1}.
-     *
-     * This exists for the Paxos path, see {@link PartitionUpdate#updateAllTimestamp} for additional details.
-     */
-    public Row updateAllTimestamp(long newTimestamp)
-    {
-        LivenessInfo newInfo = primaryKeyLivenessInfo.isEmpty() ? primaryKeyLivenessInfo : primaryKeyLivenessInfo.withUpdatedTimestamp(newTimestamp);
-        DeletionTime newDeletion = deletion.isLive() ? deletion : new DeletionTime(newTimestamp - 1, deletion.localDeletionTime());
-
-        ColumnData[] newData = new ColumnData[size];
-        for (int i = 0; i < size; i++)
-            newData[i] = data[i].updateAllTimestamp(newTimestamp);
-
-        return new ArrayBackedRow(clustering, columns, newInfo, newDeletion, size, newData, minLocalDeletionTime);
-    }
-
-    public Row purge(DeletionPurger purger, int nowInSec)
-    {
-        if (!hasDeletion(nowInSec))
-            return this;
-
-        LivenessInfo newInfo = purger.shouldPurge(primaryKeyLivenessInfo, nowInSec) ? LivenessInfo.EMPTY : primaryKeyLivenessInfo;
-        DeletionTime newDeletion = purger.shouldPurge(deletion) ? DeletionTime.LIVE : deletion;
-
-        int newMinDeletionTime = Math.min(minDeletionTime(newInfo), minDeletionTime(newDeletion));
-        ColumnData[] newData = new ColumnData[size];
-        int newSize = 0;
-        for (int i = 0; i < size; i++)
-        {
-            ColumnData purged = data[i].purge(purger, nowInSec);
-            if (purged != null)
-            {
-                newData[newSize++] = purged;
-                newMinDeletionTime = Math.min(newMinDeletionTime, minDeletionTime(purged));
-            }
-        }
-
-        if (newSize == 0 && newInfo.isEmpty() && newDeletion.isLive())
-            return null;
-
-        return new ArrayBackedRow(clustering, columns, newInfo, newDeletion, newSize, newData, newMinDeletionTime);
-    }
-
-    public int dataSize()
-    {
-        int dataSize = clustering.dataSize()
-                     + primaryKeyLivenessInfo.dataSize()
-                     + deletion.dataSize();
-
-        for (int i = 0; i < size; i++)
-            dataSize += data[i].dataSize();
-        return dataSize;
-    }
-
-    public long unsharedHeapSizeExcludingData()
-    {
-        long heapSize = EMPTY_SIZE
-                      + clustering.unsharedHeapSizeExcludingData()
-                      + ObjectSizes.sizeOfArray(data);
-
-        for (int i = 0; i < size; i++)
-            heapSize += data[i].unsharedHeapSizeExcludingData();
-        return heapSize;
-    }
-
-    public static Row.Builder sortedBuilder(Columns columns)
-    {
-        return new SortedBuilder(columns);
-    }
-
-    public static Row.Builder unsortedBuilder(Columns columns, int nowInSec)
-    {
-        return new UnsortedBuilder(columns, nowInSec);
-    }
-
-    // This is only used by PartitionUpdate.CounterMark but other uses should be avoided as much as possible as it breaks our general
-    // assumption that Row objects are immutable. This method should go away post-#6506 in particular.
-    // This method is in particular not exposed by the Row API on purpose.
-    // This method also *assumes* that the cell we're setting already exists.
-    public void setValue(ColumnDefinition column, CellPath path, ByteBuffer value)
-    {
-        int idx = binarySearch(column);
-        assert idx >= 0;
-        if (column.isSimple())
-            data[idx] = ((Cell)data[idx]).withUpdatedValue(value);
-        else
-            ((ComplexColumnData)data[idx]).setValue(path, value);
-    }
-
-    private int binarySearch(ColumnDefinition column)
-    {
-        return binarySearch(column, 0, size);
-    }
-
-    /**
-     * Simple binary search for a given column (in the data list).
-     *
-     * The return value has the exact same meaning that the one of Collections.binarySearch() but
-     * we don't use the later because we're searching for a 'ColumnDefinition' in an array of 'ColumnData'.
-     */
-    private int binarySearch(ColumnDefinition column, int fromIndex, int toIndex)
-    {
-        int low = fromIndex;
-        int mid = toIndex;
-        int high = mid - 1;
-        int result = -1;
-        while (low <= high)
-        {
-            mid = (low + high) >> 1;
-            if ((result = column.compareTo(data[mid].column())) > 0)
-                low = mid + 1;
-            else if (result == 0)
-                return mid;
-            else
-                high = mid - 1;
-        }
-        return -mid - (result < 0 ? 1 : 2);
-    }
-
-    private class ColumnDataIterator extends AbstractIterator<ColumnData>
-    {
-        private int i;
-
-        protected ColumnData computeNext()
-        {
-            return i < size ? data[i++] : endOfData();
-        }
-    }
-
-    private class CellIterator extends AbstractIterator<Cell>
-    {
-        private int i;
-        private Iterator<Cell> complexCells;
-
-        protected Cell computeNext()
-        {
-            while (true)
-            {
-                if (complexCells != null)
-                {
-                    if (complexCells.hasNext())
-                        return complexCells.next();
-
-                    complexCells = null;
-                }
-
-                if (i >= size)
-                    return endOfData();
-
-                ColumnData cd = data[i++];
-                if (cd.column().isComplex())
-                    complexCells = ((ComplexColumnData)cd).iterator();
-                else
-                    return (Cell)cd;
-            }
-        }
-    }
-
-    private class ColumnSearchIterator implements SearchIterator<ColumnDefinition, ColumnData>
-    {
-        // The index at which the next call to "next" should start looking from
-        private int searchFrom = 0;
-
-        public boolean hasNext()
-        {
-            return searchFrom < size;
-        }
-
-        public ColumnData next(ColumnDefinition column)
-        {
-            int idx = binarySearch(column, searchFrom, size);
-            if (idx < 0)
-            {
-                searchFrom = -idx - 1;
-                return null;
-            }
-            else
-            {
-                // We've found it. We'll start after it next time.
-                searchFrom = idx + 1;
-                return data[idx];
-            }
-        }
-    }
-
-    private static abstract class AbstractBuilder implements Row.Builder
-    {
-        protected final Columns columns;
-
-        protected Clustering clustering;
-        protected LivenessInfo primaryKeyLivenessInfo;
-        protected DeletionTime deletion;
-
-        protected List<Cell> cells = new ArrayList<>();
-
-        // For complex column at index i of 'columns', we store at complexDeletions[i] its complex deletion.
-        protected DeletionTime[] complexDeletions;
-        protected int columnsWithComplexDeletion;
-
-        protected AbstractBuilder(Columns columns)
-        {
-            this.columns = columns;
-            this.complexDeletions = new DeletionTime[columns.complexColumnCount()];
-        }
-
-        public void newRow(Clustering clustering)
-        {
-            assert this.clustering == null; // Ensures we've properly called build() if we've use this builder before
-            this.clustering = clustering;
-        }
-
-        public Clustering clustering()
-        {
-            return clustering;
-        }
-
-        protected void reset()
-        {
-            this.clustering = null;
-            this.primaryKeyLivenessInfo = LivenessInfo.EMPTY;
-            this.deletion = DeletionTime.LIVE;
-            this.cells.clear();
-            Arrays.fill(this.complexDeletions, null);
-            this.columnsWithComplexDeletion = 0;
-        }
-
-        public void addPrimaryKeyLivenessInfo(LivenessInfo info)
-        {
-            this.primaryKeyLivenessInfo = info;
-        }
-
-        public void addRowDeletion(DeletionTime deletion)
-        {
-            this.deletion = deletion;
-        }
-
-        public void addCell(Cell cell)
-        {
-            assert cell.column().isStatic() == (clustering == Clustering.STATIC_CLUSTERING) : "Column is " + cell.column() + ", clustering = " + clustering;
-            cells.add(cell);
-        }
-
-        public Row build()
-        {
-            Row row = buildInternal();
-            reset();
-            return row;
-        }
-
-        protected abstract Row buildInternal();
-
-        protected Row buildNoCells()
-        {
-            assert cells.isEmpty();
-            int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion));
-            if (columnsWithComplexDeletion == 0)
-                return new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, 0, NO_DATA, minDeletionTime);
-
-            ColumnData[] data = new ColumnData[columnsWithComplexDeletion];
-            int size = 0;
-            for (int i = 0; i < complexDeletions.length; i++)
-            {
-                DeletionTime complexDeletion = complexDeletions[i];
-                if (complexDeletion != null)
-                {
-                    assert !complexDeletion.isLive();
-                    data[size++] = new ComplexColumnData(columns.getComplex(i), ComplexColumnData.NO_CELLS, complexDeletion);
-                    minDeletionTime = Integer.MIN_VALUE;
-                }
-            }
-            return new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, size, data, minDeletionTime);
-        }
-    }
-
-    public static class SortedBuilder extends AbstractBuilder
-    {
-        private int columnCount;
-
-        private ColumnDefinition column;
-
-        // The index of the last column for which we've called setColumn if complex.
-        private int complexColumnIndex;
-
-        // For complex column at index i of 'columns', we store at complexColumnCellsCount[i] its number of added cells.
-        private final int[] complexColumnCellsCount;
-
-        protected SortedBuilder(Columns columns)
-        {
-            super(columns);
-            this.complexColumnCellsCount = new int[columns.complexColumnCount()];
-            reset();
-        }
-
-        @Override
-        protected void reset()
-        {
-            super.reset();
-            this.column = null;
-            this.columnCount = 0;
-            this.complexColumnIndex = -1;
-            Arrays.fill(this.complexColumnCellsCount, 0);
-        }
-
-        public boolean isSorted()
-        {
-            return true;
-        }
-
-        private void setColumn(ColumnDefinition column)
-        {
-            int cmp = this.column == null ? -1 : this.column.compareTo(column);
-            assert cmp <= 0 : "current = " + this.column + ", new = " + column;
-            if (cmp != 0)
-            {
-                this.column = column;
-                ++columnCount;
-                if (column.isComplex())
-                    complexColumnIndex = columns.complexIdx(column, complexColumnIndex + 1);
-            }
-        }
-
-        @Override
-        public void addCell(Cell cell)
-        {
-            setColumn(cell.column());
-            super.addCell(cell);
-            if (column.isComplex())
-                complexColumnCellsCount[complexColumnIndex] += 1;
-        }
-
-        @Override
-        public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion)
-        {
-            if (complexDeletion.isLive())
-                return;
-
-            setColumn(column);
-            assert complexDeletions[complexColumnIndex] == null;
-            complexDeletions[complexColumnIndex] = complexDeletion;
-            ++columnsWithComplexDeletion;
-        }
-
-        protected Row buildInternal()
-        {
-            if (cells.isEmpty())
-                return buildNoCells();
-
-            int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion));
-
-            ColumnData[] data = new ColumnData[columnCount];
-            int complexIdx = 0;
-            int i = 0;
-            int size = 0;
-            while (i < cells.size())
-            {
-                Cell cell = cells.get(i);
-                ColumnDefinition column = cell.column();
-                if (column.isSimple())
-                {
-                    data[size++] = cell;
-                    minDeletionTime = Math.min(minDeletionTime, minDeletionTime(cell));
-                    ++i;
-                }
-                else
-                {
-                    while (columns.getComplex(complexIdx).compareTo(column) < 0)
-                    {
-                        if (complexDeletions[complexIdx] != null)
-                        {
-                            data[size++] = new ComplexColumnData(columns.getComplex(complexIdx), ComplexColumnData.NO_CELLS, complexDeletions[complexIdx]);
-                            minDeletionTime = Integer.MIN_VALUE;
-                        }
-                        ++complexIdx;
-                    }
-
-                    DeletionTime complexDeletion = complexDeletions[complexIdx];
-                    if (complexDeletion != null)
-                        minDeletionTime = Integer.MIN_VALUE;
-                    int cellCount = complexColumnCellsCount[complexIdx];
-                    Cell[] complexCells = new Cell[cellCount];
-                    for (int j = 0; j < cellCount; j++)
-                    {
-                        Cell complexCell = cells.get(i + j);
-                        complexCells[j] = complexCell;
-                        minDeletionTime = Math.min(minDeletionTime, minDeletionTime(complexCell));
-                    }
-                    i += cellCount;
-
-                    data[size++] = new ComplexColumnData(column, complexCells, complexDeletion == null ? DeletionTime.LIVE : complexDeletion);
-                    ++complexIdx;
-                }
-            }
-            for (int j = complexIdx; j < complexDeletions.length; j++)
-            {
-                if (complexDeletions[j] != null)
-                {
-                    data[size++] = new ComplexColumnData(columns.getComplex(j), ComplexColumnData.NO_CELLS, complexDeletions[j]);
-                    minDeletionTime = Integer.MIN_VALUE;
-                }
-            }
-            assert size == data.length;
-            return new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, size, data, minDeletionTime);
-        }
-    }
-
-    private static class UnsortedBuilder extends AbstractBuilder
-    {
-        private final int nowInSec;
-
-        private UnsortedBuilder(Columns columns, int nowInSec)
-        {
-            super(columns);
-            this.nowInSec = nowInSec;
-            reset();
-        }
-
-        public boolean isSorted()
-        {
-            return false;
-        }
-
-        public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion)
-        {
-            assert column.isComplex();
-            assert column.isStatic() == (clustering == Clustering.STATIC_CLUSTERING);
-
-            if (complexDeletion.isLive())
-                return;
-
-            int complexColumnIndex = columns.complexIdx(column, 0);
-
-            DeletionTime previous = complexDeletions[complexColumnIndex];
-            if (previous == null || complexDeletion.supersedes(previous))
-            {
-                complexDeletions[complexColumnIndex] = complexDeletion;
-                if (previous == null)
-                    ++columnsWithComplexDeletion;
-            }
-        }
-
-        protected Row buildInternal()
-        {
-            // First, the easy cases
-            if (cells.isEmpty())
-                return buildNoCells();
-
-            // Cells have been added in an unsorted way, so sort them first
-            Collections.sort(cells, Cell.comparator);
-
-            // We now need to
-            //  1) merge equal cells together
-            //  2) group the cells for a given complex column together, and include their potential complex deletion time.
-            //     And this without forgetting that some complex columns may have a complex deletion but not cells.
-
-            int addedColumns = countAddedColumns();
-            ColumnData[] data = new ColumnData[addedColumns];
-
-            int nextComplexWithDeletion = findNextComplexWithDeletion(0);
-            ColumnDefinition previousColumn = null;
-
-            int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion));
-
-            int i = 0;
-            int size = 0;
-            while (i < cells.size())
-            {
-                Cell cell = cells.get(i++);
-                ColumnDefinition column = cell.column();
-                if (column.isSimple())
-                {
-                    // Either it's a cell for the same column than our previous cell and we merge them together, or it's a new column
-                    if (previousColumn != null && previousColumn.compareTo(column) == 0)
-                        data[size - 1] = Cells.reconcile((Cell)data[size - 1], cell, nowInSec);
-                    else
-                        data[size++] = cell;
-                }
-                else
-                {
-                    // First, collect the complex deletion time for the column we got the first complex column of. We'll
-                    // also find if there is columns that sorts before but had only a complex deletion and add them.
-                    DeletionTime complexDeletion = DeletionTime.LIVE;
-                    while (nextComplexWithDeletion >= 0)
-                    {
-                        int cmp = column.compareTo(columns.getComplex(nextComplexWithDeletion));
-                        if (cmp < 0)
-                        {
-                            // This is after the column we're gonna add cell for. We'll deal with it later
-                            break;
-                        }
-                        else if (cmp > 0)
-                        {
-                            // We have a column that only has a complex deletion and no column. Add its data first
-                            data[size++] = new ComplexColumnData(columns.getComplex(nextComplexWithDeletion), ComplexColumnData.NO_CELLS, complexDeletions[nextComplexWithDeletion]);
-                            minDeletionTime = Integer.MIN_VALUE;
-                            nextComplexWithDeletion = findNextComplexWithDeletion(nextComplexWithDeletion + 1);
-                        }
-                        else // cmp == 0
-                        {
-                            // This is the column we'll about to add cell for. Record the deletion time and break to the cell addition
-                            complexDeletion = complexDeletions[nextComplexWithDeletion];
-                            minDeletionTime = Integer.MIN_VALUE;
-                            nextComplexWithDeletion = findNextComplexWithDeletion(nextComplexWithDeletion + 1);
-                            break;
-                        }
-                    }
-
-                    // Find how many cells the complex column has (cellCount) and the index of the next cell that doesn't belong to it (nextColumnIdx).
-                    int nextColumnIdx = i; // i is on cell following the current one
-                    int cellCount = 1; // We have at least the current cell
-                    Cell previousCell = cell;
-                    while (nextColumnIdx < cells.size())
-                    {
-                        Cell newCell = cells.get(nextColumnIdx);
-                        if (column.compareTo(newCell.column()) != 0)
-                            break;
-
-                        ++nextColumnIdx;
-                        if (column.cellPathComparator().compare(previousCell.path(), newCell.path()) != 0)
-                            ++cellCount;
-                        previousCell = newCell;
-                    }
-                    Cell[] columnCells = new Cell[cellCount];
-                    int complexSize = 0;
-                    columnCells[complexSize++] = cell;
-                    previousCell = cell;
-                    for (int j = i; j < nextColumnIdx; j++)
-                    {
-                        Cell newCell = cells.get(j);
-                        // Either it's a cell for the same path than our previous cell and we merge them together, or it's a new path
-                        if (column.cellPathComparator().compare(previousCell.path(), newCell.path()) == 0)
-                            columnCells[complexSize - 1] = Cells.reconcile(previousCell, newCell, nowInSec);
-                        else
-                            columnCells[complexSize++] = newCell;
-                        previousCell = newCell;
-                    }
-                    i = nextColumnIdx;
-
-                    data[size++] = new ComplexColumnData(column, columnCells, complexDeletion);
-                }
-                previousColumn = column;
-            }
-            // We may still have some complex columns with only a complex deletion
-            while (nextComplexWithDeletion >= 0)
-            {
-                data[size++] = new ComplexColumnData(columns.getComplex(nextComplexWithDeletion), ComplexColumnData.NO_CELLS, complexDeletions[nextComplexWithDeletion]);
-                nextComplexWithDeletion = findNextComplexWithDeletion(nextComplexWithDeletion + 1);
-                minDeletionTime = Integer.MIN_VALUE;
-            }
-            assert size == addedColumns;
-
-            // Reconciliation made it harder to compute minDeletionTime for cells in the loop above, so just do it now if we need to.
-            if (minDeletionTime != Integer.MIN_VALUE)
-            {
-                for (ColumnData cd : data)
-                    minDeletionTime = Math.min(minDeletionTime, minDeletionTime(cd));
-            }
-
-            return new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, size, data, minDeletionTime);
-        }
-
-        private int findNextComplexWithDeletion(int from)
-        {
-            for (int i = from; i < complexDeletions.length; i++)
-            {
-                if (complexDeletions[i] != null)
-                    return i;
-            }
-            return -1;
-        }
-
-        // Should only be called once the cells have been sorted
-        private int countAddedColumns()
-        {
-            int columnCount = 0;
-            int nextComplexWithDeletion = findNextComplexWithDeletion(0);
-            ColumnDefinition previousColumn = null;
-            for (Cell cell : cells)
-            {
-                if (previousColumn != null && previousColumn.compareTo(cell.column()) == 0)
-                    continue;
-
-                ++columnCount;
-                previousColumn = cell.column();
-
-                // We know that simple columns sort before the complex ones, so don't bother with the column having complex deletion
-                // until we've reached the cells of complex columns.
-                if (!previousColumn.isComplex())
-                    continue;
-
-                while (nextComplexWithDeletion >= 0)
-                {
-                    // Check how the column we just counted compared to the next with complex deletion
-                    int cmp = previousColumn.compareTo(columns.getComplex(nextComplexWithDeletion));
-                    if (cmp < 0)
-                    {
-                        // it's before, we'll handle nextColumnWithComplexDeletion later
-                        break;
-                    }
-                    else if (cmp > 0)
-                    {
-                        // it's after. nextColumnWithComplexDeletion has no cell but we should count it
-                        ++columnCount;
-                        nextComplexWithDeletion = findNextComplexWithDeletion(nextComplexWithDeletion + 1);
-                    }
-                    else // cmp == 0
-                    {
-                        // it's the column we just counted. Ignore it and we know we're good with nextComplexWithDeletion for this loop
-                        nextComplexWithDeletion = findNextComplexWithDeletion(nextComplexWithDeletion + 1);
-                        break;
-                    }
-                }
-            }
-            // Anything remaining in complexDeletionColumns are complex columns with no cells but some complex deletion
-            while (nextComplexWithDeletion >= 0)
-            {
-                ++columnCount;
-                nextComplexWithDeletion = findNextComplexWithDeletion(nextComplexWithDeletion + 1);
-            }
-            return columnCount;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java b/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java
new file mode 100644
index 0000000..7e9ceb8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.Predicate;
+
+import com.google.common.base.Function;
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.BTreeSearchIterator;
+import org.apache.cassandra.utils.btree.UpdateFunction;
+
+/**
+ * Immutable implementation of a Row object.
+ */
+public class BTreeBackedRow extends AbstractRow
+{
+    private static final ColumnData[] NO_DATA = new ColumnData[0];
+
+    private static final long EMPTY_SIZE = ObjectSizes.measure(emptyRow(Clustering.EMPTY));
+
+    private final Clustering clustering;
+    private final Columns columns;
+    private final LivenessInfo primaryKeyLivenessInfo;
+    private final DeletionTime deletion;
+
+    // The data for each columns present in this row in column sorted order.
+    private final Object[] btree;
+
+    // We need to filter the tombstones of a row on every read (twice in fact: first to remove purgeable tombstone, and then after reconciliation to remove
+    // all tombstone since we don't return them to the client) as well as on compaction. But it's likely that many rows won't have any tombstone at all, so
+    // we want to speed up that case by not having to iterate/copy the row in this case. We could keep a single boolean telling us if we have tombstones,
+    // but that doesn't work for expiring columns. So instead we keep the deletion time for the first thing in the row to be deleted. This allow at any given
+    // time to know if we have any deleted information or not. If we any "true" tombstone (i.e. not an expiring cell), this value will be forced to
+    // Integer.MIN_VALUE, but if we don't and have expiring cells, this will the time at which the first expiring cell expires. If we have no tombstones and
+    // no expiring cells, this will be Integer.MAX_VALUE;
+    private final int minLocalDeletionTime;
+
+    private BTreeBackedRow(Clustering clustering, Columns columns, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, Object[] btree, int minLocalDeletionTime)
+    {
+        this.clustering = clustering;
+        this.columns = columns;
+        this.primaryKeyLivenessInfo = primaryKeyLivenessInfo;
+        this.deletion = deletion;
+        this.btree = btree;
+        this.minLocalDeletionTime = minLocalDeletionTime;
+    }
+
+    // Note that it's often easier/safer to use the sortedBuilder/unsortedBuilder or one of the static creation method below. Only directly useful in a small amount of cases.
+    public static BTreeBackedRow create(Clustering clustering, Columns columns, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, Object[] btree)
+    {
+        int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion));
+        if (minDeletionTime != Integer.MIN_VALUE)
+        {
+            for (ColumnData cd : BTree.<ColumnData>iterable(btree))
+                minDeletionTime = Math.min(minDeletionTime, minDeletionTime(cd));
+        }
+
+        return new BTreeBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
+    }
+
+    public static BTreeBackedRow emptyRow(Clustering clustering)
+    {
+        return new BTreeBackedRow(clustering, Columns.NONE, LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.empty(), Integer.MAX_VALUE);
+    }
+
+    public static BTreeBackedRow singleCellRow(Clustering clustering, Cell cell)
+    {
+        if (cell.column().isSimple())
+            return new BTreeBackedRow(clustering, Columns.of(cell.column()), LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.singleton(cell), minDeletionTime(cell));
+
+        ComplexColumnData complexData = new ComplexColumnData(cell.column(), new Cell[]{ cell }, DeletionTime.LIVE);
+        return new BTreeBackedRow(clustering, Columns.of(cell.column()), LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.singleton(complexData), minDeletionTime(cell));
+    }
+
+    public static BTreeBackedRow emptyDeletedRow(Clustering clustering, DeletionTime deletion)
+    {
+        assert !deletion.isLive();
+        return new BTreeBackedRow(clustering, Columns.NONE, LivenessInfo.EMPTY, deletion, BTree.empty(), Integer.MIN_VALUE);
+    }
+
+    public static BTreeBackedRow noCellLiveRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo)
+    {
+        assert !primaryKeyLivenessInfo.isEmpty();
+        return new BTreeBackedRow(clustering, Columns.NONE, primaryKeyLivenessInfo, DeletionTime.LIVE, BTree.empty(), minDeletionTime(primaryKeyLivenessInfo));
+    }
+
+    private static int minDeletionTime(Cell cell)
+    {
+        return cell.isTombstone() ? Integer.MIN_VALUE : cell.localDeletionTime();
+    }
+
+    private static int minDeletionTime(LivenessInfo info)
+    {
+        return info.isExpiring() ? info.localExpirationTime() : Integer.MAX_VALUE;
+    }
+
+    private static int minDeletionTime(DeletionTime dt)
+    {
+        return dt.isLive() ? Integer.MAX_VALUE : Integer.MIN_VALUE;
+    }
+
+    private static int minDeletionTime(ComplexColumnData cd)
+    {
+        int min = minDeletionTime(cd.complexDeletion());
+        for (Cell cell : cd)
+        {
+            min = Math.min(min, minDeletionTime(cell));
+            if (min == Integer.MIN_VALUE)
+                break;
+        }
+        return min;
+    }
+
+    private static int minDeletionTime(ColumnData cd)
+    {
+        return cd.column().isSimple() ? minDeletionTime((Cell) cd) : minDeletionTime((ComplexColumnData)cd);
+    }
+
+    private static int minDeletionTime(Object[] btree, LivenessInfo info, DeletionTime rowDeletion)
+    {
+        int min = Math.min(minDeletionTime(info), minDeletionTime(rowDeletion));
+        for (ColumnData cd : BTree.<ColumnData>iterable(btree))
+        {
+            min = Math.min(min, minDeletionTime(cd));
+            if (min == Integer.MIN_VALUE)
+                break;
+        }
+        return min;
+    }
+
+    public Clustering clustering()
+    {
+        return clustering;
+    }
+
+    public Columns columns()
+    {
+        return columns;
+    }
+
+    public LivenessInfo primaryKeyLivenessInfo()
+    {
+        return primaryKeyLivenessInfo;
+    }
+
+    public DeletionTime deletion()
+    {
+        return deletion;
+    }
+
+    public Cell getCell(ColumnDefinition c)
+    {
+        assert !c.isComplex();
+        return (Cell) BTree.<Object>find(btree, ColumnDefinition.asymmetricColumnDataComparator, c);
+    }
+
+    public Cell getCell(ColumnDefinition c, CellPath path)
+    {
+        assert c.isComplex();
+        ComplexColumnData cd = getComplexColumnData(c);
+        if (cd == null)
+            return null;
+        return cd.getCell(path);
+    }
+
+    public ComplexColumnData getComplexColumnData(ColumnDefinition c)
+    {
+        assert c.isComplex();
+        return (ComplexColumnData) BTree.<Object>find(btree, ColumnDefinition.asymmetricColumnDataComparator, c);
+    }
+
+    public Iterator<ColumnData> iterator()
+    {
+        return searchIterator();
+    }
+
+    public Iterable<Cell> cells()
+    {
+        return CellIterator::new;
+    }
+
+    public BTreeSearchIterator<ColumnDefinition, ColumnData> searchIterator()
+    {
+        return BTree.slice(btree, ColumnDefinition.asymmetricColumnDataComparator, BTree.Dir.ASC);
+    }
+
+    public Row filter(ColumnFilter filter, CFMetaData metadata)
+    {
+        return filter(filter, DeletionTime.LIVE, false, metadata);
+    }
+
+    public Row filter(ColumnFilter filter, DeletionTime activeDeletion, boolean setActiveDeletionToRow, CFMetaData metadata)
+    {
+        Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = metadata.getDroppedColumns();
+
+        if (filter.includesAllColumns() && (activeDeletion.isLive() || deletion.supersedes(activeDeletion)) && droppedColumns.isEmpty())
+            return this;
+
+        boolean mayHaveShadowed = activeDeletion.supersedes(deletion);
+
+        LivenessInfo newInfo = primaryKeyLivenessInfo;
+        DeletionTime newDeletion = deletion;
+        if (mayHaveShadowed)
+        {
+            if (activeDeletion.deletes(newInfo.timestamp()))
+                newInfo = LivenessInfo.EMPTY;
+            // note that mayHaveShadowed means the activeDeletion shadows the row deletion. So if don't have setActiveDeletionToRow,
+            // the row deletion is shadowed and we shouldn't return it.
+            newDeletion = setActiveDeletionToRow ? activeDeletion : DeletionTime.LIVE;
+        }
+
+        Columns columns = filter.fetchedColumns().columns(isStatic());
+        Predicate<ColumnDefinition> inclusionTester = columns.inOrderInclusionTester();
+        return transformAndFilter(newInfo, newDeletion, (cd) -> {
+
+            ColumnDefinition column = cd.column();
+            if (!inclusionTester.test(column))
+                return null;
+
+            CFMetaData.DroppedColumn dropped = droppedColumns.get(column.name.bytes);
+            if (column.isComplex())
+                return ((ComplexColumnData)cd).filter(filter, mayHaveShadowed ? activeDeletion : DeletionTime.LIVE, dropped);
+
+            Cell cell = (Cell)cd;
+            return (dropped == null || cell.timestamp() > dropped.droppedTime) && !(mayHaveShadowed && activeDeletion.deletes(cell))
+                   ? cell : null;
+        });
+    }
+
+    public boolean hasComplexDeletion()
+    {
+        // We start by the end cause we know complex columns sort before simple ones
+        for (ColumnData cd : BTree.<ColumnData>iterable(btree, BTree.Dir.DESC))
+        {
+            if (cd.column().isSimple())
+                return false;
+
+            if (!((ComplexColumnData)cd).complexDeletion().isLive())
+                return true;
+        }
+        return false;
+    }
+
+    public Row markCounterLocalToBeCleared()
+    {
+        return transformAndFilter(primaryKeyLivenessInfo, deletion, (cd) -> cd.column().cellValueType().isCounter()
+                                                                            ? cd.markCounterLocalToBeCleared()
+                                                                            : cd);
+    }
+
+    public boolean hasDeletion(int nowInSec)
+    {
+        return nowInSec >= minLocalDeletionTime;
+    }
+
+    /**
+     * Returns a copy of the row where all timestamps for live data have replaced by {@code newTimestamp} and
+     * all deletion timestamp by {@code newTimestamp - 1}.
+     *
+     * This exists for the Paxos path, see {@link PartitionUpdate#updateAllTimestamp} for additional details.
+     */
+    public Row updateAllTimestamp(long newTimestamp)
+    {
+        LivenessInfo newInfo = primaryKeyLivenessInfo.isEmpty() ? primaryKeyLivenessInfo : primaryKeyLivenessInfo.withUpdatedTimestamp(newTimestamp);
+        DeletionTime newDeletion = deletion.isLive() ? deletion : new DeletionTime(newTimestamp - 1, deletion.localDeletionTime());
+
+        return transformAndFilter(newInfo, newDeletion, (cd) -> cd.updateAllTimestamp(newTimestamp));
+    }
+
+    public Row purge(DeletionPurger purger, int nowInSec)
+    {
+        if (!hasDeletion(nowInSec))
+            return this;
+
+        LivenessInfo newInfo = purger.shouldPurge(primaryKeyLivenessInfo, nowInSec) ? LivenessInfo.EMPTY : primaryKeyLivenessInfo;
+        DeletionTime newDeletion = purger.shouldPurge(deletion) ? DeletionTime.LIVE : deletion;
+
+        return transformAndFilter(newInfo, newDeletion, (cd) -> cd.purge(purger, nowInSec));
+    }
+
+    private Row transformAndFilter(LivenessInfo info, DeletionTime deletion, Function<ColumnData, ColumnData> function)
+    {
+        Object[] transformed = BTree.transformAndFilter(btree, function);
+
+        if (btree == transformed && info == this.primaryKeyLivenessInfo && deletion == this.deletion)
+            return this;
+
+        if (info.isEmpty() && deletion.isLive() && BTree.isEmpty(transformed))
+            return null;
+
+        int minDeletionTime = minDeletionTime(transformed, info, deletion);
+        return new BTreeBackedRow(clustering, columns, info, deletion, transformed, minDeletionTime);
+    }
+
+    public int dataSize()
+    {
+        int dataSize = clustering.dataSize()
+                     + primaryKeyLivenessInfo.dataSize()
+                     + deletion.dataSize();
+
+        for (ColumnData cd : this)
+            dataSize += cd.dataSize();
+        return dataSize;
+    }
+
+    public long unsharedHeapSizeExcludingData()
+    {
+        long heapSize = EMPTY_SIZE
+                      + clustering.unsharedHeapSizeExcludingData()
+                      + BTree.sizeOfStructureOnHeap(btree);
+
+        for (ColumnData cd : this)
+            heapSize += cd.unsharedHeapSizeExcludingData();
+        return heapSize;
+    }
+
+    public static Row.Builder sortedBuilder(Columns columns)
+    {
+        return new Builder(columns, true);
+    }
+
+    public static Row.Builder unsortedBuilder(Columns columns, int nowInSec)
+    {
+        return new Builder(columns, false, nowInSec);
+    }
+
+    // This is only used by PartitionUpdate.CounterMark but other uses should be avoided as much as possible as it breaks our general
+    // assumption that Row objects are immutable. This method should go away post-#6506 in particular.
+    // This method is in particular not exposed by the Row API on purpose.
+    // This method also *assumes* that the cell we're setting already exists.
+    public void setValue(ColumnDefinition column, CellPath path, ByteBuffer value)
+    {
+        ColumnData current = (ColumnData) BTree.<Object>find(btree, ColumnDefinition.asymmetricColumnDataComparator, column);
+        if (column.isSimple())
+            BTree.replaceInSitu(btree, ColumnData.comparator, current, ((Cell) current).withUpdatedValue(value));
+        else
+            ((ComplexColumnData) current).setValue(path, value);
+    }
+
+    private class CellIterator extends AbstractIterator<Cell>
+    {
+        private Iterator<ColumnData> columnData = iterator();
+        private Iterator<Cell> complexCells;
+
+        protected Cell computeNext()
+        {
+            while (true)
+            {
+                if (complexCells != null)
+                {
+                    if (complexCells.hasNext())
+                        return complexCells.next();
+
+                    complexCells = null;
+                }
+
+                if (!columnData.hasNext())
+                    return endOfData();
+
+                ColumnData cd = columnData.next();
+                if (cd.column().isComplex())
+                    complexCells = ((ComplexColumnData)cd).iterator();
+                else
+                    return (Cell)cd;
+            }
+        }
+    }
+
+    public static class Builder implements Row.Builder
+    {
+        // a simple marker class that will sort to the beginning of a run of complex cells to store the deletion time
+        private static class ComplexColumnDeletion extends BufferCell
+        {
+            public ComplexColumnDeletion(ColumnDefinition column, DeletionTime deletionTime)
+            {
+                super(column, deletionTime.markedForDeleteAt(), 0, deletionTime.localDeletionTime(), ByteBufferUtil.EMPTY_BYTE_BUFFER, CellPath.BOTTOM);
+            }
+        }
+
+        // converts a run of Cell with equal column into a ColumnData
+        private static class CellResolver implements BTree.Builder.Resolver
+        {
+            final int nowInSec;
+            private CellResolver(int nowInSec)
+            {
+                this.nowInSec = nowInSec;
+            }
+
+            public ColumnData resolve(Object[] cells, int lb, int ub)
+            {
+                Cell cell = (Cell) cells[lb];
+                ColumnDefinition column = cell.column;
+                if (cell.column.isSimple())
+                {
+                    assert lb + 1 == ub || nowInSec != Integer.MIN_VALUE;
+                    while (++lb < ub)
+                        cell = Cells.reconcile(cell, (Cell) cells[lb], nowInSec);
+                    return cell;
+                }
+
+                // TODO: relax this in the case our outer provider is sorted (want to delay until remaining changes are
+                // bedded in, as less important; galloping makes it pretty cheap anyway)
+                Arrays.sort(cells, lb, ub, (Comparator<Object>) column.cellComparator());
+                cell = (Cell) cells[lb];
+                DeletionTime deletion = DeletionTime.LIVE;
+                if (cell instanceof ComplexColumnDeletion)
+                {
+                    // TODO: do we need to be robust to multiple of these being provided?
+                    deletion = new DeletionTime(cell.timestamp(), cell.localDeletionTime());
+                    lb++;
+                }
+
+                List<Object> buildFrom = Arrays.asList(cells).subList(lb, ub);
+                Object[] btree = BTree.build(buildFrom, UpdateFunction.noOp());
+                return new ComplexColumnData(column, btree, deletion);
+            }
+
+        };
+        protected final Columns columns;
+
+        protected Clustering clustering;
+        protected LivenessInfo primaryKeyLivenessInfo = LivenessInfo.EMPTY;
+        protected DeletionTime deletion = DeletionTime.LIVE;
+
+        private final boolean isSorted;
+        private final BTree.Builder<Cell> cells;
+        private final CellResolver resolver;
+        private boolean hasComplex = false;
+
+        // For complex column at index i of 'columns', we store at complexDeletions[i] its complex deletion.
+
+        protected Builder(Columns columns, boolean isSorted)
+        {
+            this(columns, isSorted, Integer.MIN_VALUE);
+        }
+
+        protected Builder(Columns columns, boolean isSorted, int nowInSecs)
+        {
+            this.columns = columns;
+            this.cells = BTree.builder(ColumnData.comparator);
+            resolver = new CellResolver(nowInSecs);
+            this.isSorted = isSorted;
+            this.cells.auto(false);
+        }
+
+        public boolean isSorted()
+        {
+            return isSorted;
+        }
+
+        public void newRow(Clustering clustering)
+        {
+            assert this.clustering == null; // Ensures we've properly called build() if we've use this builder before
+            this.clustering = clustering;
+        }
+
+        public Clustering clustering()
+        {
+            return clustering;
+        }
+
+        protected void reset()
+        {
+            this.clustering = null;
+            this.primaryKeyLivenessInfo = LivenessInfo.EMPTY;
+            this.deletion = DeletionTime.LIVE;
+            this.cells.reuse();
+        }
+
+        public void addPrimaryKeyLivenessInfo(LivenessInfo info)
+        {
+            this.primaryKeyLivenessInfo = info;
+        }
+
+        public void addRowDeletion(DeletionTime deletion)
+        {
+            this.deletion = deletion;
+        }
+
+        public void addCell(Cell cell)
+        {
+            assert cell.column().isStatic() == (clustering == Clustering.STATIC_CLUSTERING) : "Column is " + cell.column() + ", clustering = " + clustering;
+            cells.add(cell);
+            hasComplex |= cell.column.isComplex();
+        }
+
+        public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion)
+        {
+            cells.add(new ComplexColumnDeletion(column, complexDeletion));
+            hasComplex = true;
+        }
+
+        public Row build()
+        {
+            if (!isSorted)
+                cells.sort();
+            // we can avoid resolving if we're sorted and have no complex values
+            // (because we'll only have unique simple cells, which are already in their final condition)
+            if (!isSorted | hasComplex)
+                cells.resolve(resolver);
+            Object[] btree = cells.build();
+            int minDeletionTime = minDeletionTime(btree, primaryKeyLivenessInfo, deletion);
+            Row row = new BTreeBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
+            reset();
+            return row;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/BufferCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java b/src/java/org/apache/cassandra/db/rows/BufferCell.java
index e952748..81c42d4 100644
--- a/src/java/org/apache/cassandra/db/rows/BufferCell.java
+++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java
@@ -36,8 +36,6 @@ public class BufferCell extends AbstractCell
 {
     private static final long EMPTY_SIZE = ObjectSizes.measure(new BufferCell(ColumnDefinition.regularDef("", "", "", ByteType.instance), 0L, 0, 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, null));
 
-    private final ColumnDefinition column;
-
     private final long timestamp;
     private final int ttl;
     private final int localDeletionTime;
@@ -47,8 +45,8 @@ public class BufferCell extends AbstractCell
 
     public BufferCell(ColumnDefinition column, long timestamp, int ttl, int localDeletionTime, ByteBuffer value, CellPath path)
     {
+        super(column);
         assert column.isComplex() == (path != null);
-        this.column = column;
         this.timestamp = timestamp;
         this.ttl = ttl;
         this.localDeletionTime = localDeletionTime;
@@ -90,11 +88,6 @@ public class BufferCell extends AbstractCell
         return new BufferCell(column, timestamp, NO_TTL, nowInSec, ByteBufferUtil.EMPTY_BYTE_BUFFER, path);
     }
 
-    public ColumnDefinition column()
-    {
-        return column;
-    }
-
     public boolean isCounterCell()
     {
         return !isTombstone() && column.cellValueType().isCounter();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/Cell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java b/src/java/org/apache/cassandra/db/rows/Cell.java
index ccb9708..1820de2 100644
--- a/src/java/org/apache/cassandra/db/rows/Cell.java
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@ -36,7 +36,7 @@ import org.apache.cassandra.utils.memory.AbstractAllocator;
  *   2) expiring cells: on top of regular cells, those have a ttl and a local deletion time (when they are expired).
  *   3) tombstone cells: those won't have value, but they have a local deletion time (when the tombstone was created).
  */
-public interface Cell extends ColumnData
+public abstract class Cell extends ColumnData
 {
     public static final int NO_TTL = 0;
     public static final int NO_DELETION_TIME = Integer.MAX_VALUE;
@@ -51,35 +51,40 @@ public interface Cell extends ColumnData
         return pathComparator == null ? 0 : pathComparator.compare(c1.path(), c2.path());
     };
 
-    public final Serializer serializer = new BufferCell.Serializer();
+    public static final Serializer serializer = new BufferCell.Serializer();
+
+    protected Cell(ColumnDefinition column)
+    {
+        super(column);
+    }
 
     /**
      * Whether the cell is a counter cell or not.
      *
      * @return whether the cell is a counter cell or not.
      */
-    public boolean isCounterCell();
+    public abstract boolean isCounterCell();
 
     /**
      * The cell value.
      *
      * @return the cell value.
      */
-    public ByteBuffer value();
+    public abstract ByteBuffer value();
 
     /**
      * The cell timestamp.
      * <p>
      * @return the cell timestamp.
      */
-    public long timestamp();
+    public abstract long timestamp();
 
     /**
      * The cell ttl.
      *
      * @return the cell ttl, or {@code NO_TTL} if the cell isn't an expiring one.
      */
-    public int ttl();
+    public abstract int ttl();
 
     /**
      * The cell local deletion time.
@@ -87,14 +92,14 @@ public interface Cell extends ColumnData
      * @return the cell local deletion time, or {@code NO_DELETION_TIME} if the cell is neither
      * a tombstone nor an expiring one.
      */
-    public int localDeletionTime();
+    public abstract int localDeletionTime();
 
     /**
      * Whether the cell is a tombstone or not.
      *
      * @return whether the cell is a tombstone or not.
      */
-    public boolean isTombstone();
+    public abstract boolean isTombstone();
 
     /**
      * Whether the cell is an expiring one or not.
@@ -105,7 +110,7 @@ public interface Cell extends ColumnData
      *
      * @return whether the cell is an expiring one or not.
      */
-    public boolean isExpiring();
+    public abstract boolean isExpiring();
 
     /**
      * Whether the cell is live or not given the current time.
@@ -114,7 +119,7 @@ public interface Cell extends ColumnData
      * decide if an expiring cell is expired or live.
      * @return whether the cell is live or not at {@code nowInSec}.
      */
-    public boolean isLive(int nowInSec);
+    public abstract boolean isLive(int nowInSec);
 
     /**
      * For cells belonging to complex types (non-frozen collection and UDT), the
@@ -122,19 +127,19 @@ public interface Cell extends ColumnData
      *
      * @return the cell path for cells of complex column, and {@code null} for other cells.
      */
-    public CellPath path();
+    public abstract CellPath path();
 
-    public Cell withUpdatedValue(ByteBuffer newValue);
+    public abstract Cell withUpdatedValue(ByteBuffer newValue);
 
-    public Cell copy(AbstractAllocator allocator);
+    public abstract Cell copy(AbstractAllocator allocator);
 
     @Override
     // Overrides super type to provide a more precise return type.
-    public Cell markCounterLocalToBeCleared();
+    public abstract Cell markCounterLocalToBeCleared();
 
     @Override
     // Overrides super type to provide a more precise return type.
-    public Cell purge(DeletionPurger purger, int nowInSec);
+    public abstract Cell purge(DeletionPurger purger, int nowInSec);
 
     public interface Serializer
     {