You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/08/12 23:53:41 UTC

[2/9] cassandra git commit: Remove ArrayBackedPartition and hierarchy

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
new file mode 100644
index 0000000..7e50716
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.Predicate;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.BTreeSearchIterator;
+import org.apache.cassandra.utils.btree.UpdateFunction;
+
+/**
+ * Immutable implementation of a Row object.
+ */
+public class BTreeRow extends AbstractRow
+{
+    private static final long EMPTY_SIZE = ObjectSizes.measure(emptyRow(Clustering.EMPTY));
+
+    private final Clustering clustering;
+    private final Columns columns;
+    private final LivenessInfo primaryKeyLivenessInfo;
+    private final DeletionTime deletion;
+
+    // The data for each columns present in this row in column sorted order.
+    private final Object[] btree;
+
+    // We need to filter the tombstones of a row on every read (twice in fact: first to remove purgeable tombstone, and then after reconciliation to remove
+    // all tombstone since we don't return them to the client) as well as on compaction. But it's likely that many rows won't have any tombstone at all, so
+    // we want to speed up that case by not having to iterate/copy the row in this case. We could keep a single boolean telling us if we have tombstones,
+    // but that doesn't work for expiring columns. So instead we keep the deletion time for the first thing in the row to be deleted. This allow at any given
+    // time to know if we have any deleted information or not. If we any "true" tombstone (i.e. not an expiring cell), this value will be forced to
+    // Integer.MIN_VALUE, but if we don't and have expiring cells, this will the time at which the first expiring cell expires. If we have no tombstones and
+    // no expiring cells, this will be Integer.MAX_VALUE;
+    private final int minLocalDeletionTime;
+
+    private BTreeRow(Clustering clustering, Columns columns, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, Object[] btree, int minLocalDeletionTime)
+    {
+        this.clustering = clustering;
+        this.columns = columns;
+        this.primaryKeyLivenessInfo = primaryKeyLivenessInfo;
+        this.deletion = deletion;
+        this.btree = btree;
+        this.minLocalDeletionTime = minLocalDeletionTime;
+    }
+
+    // Note that it's often easier/safer to use the sortedBuilder/unsortedBuilder or one of the static creation method below. Only directly useful in a small amount of cases.
+    public static BTreeRow create(Clustering clustering, Columns columns, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, Object[] btree)
+    {
+        int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion));
+        if (minDeletionTime != Integer.MIN_VALUE)
+        {
+            for (ColumnData cd : BTree.<ColumnData>iterable(btree))
+                minDeletionTime = Math.min(minDeletionTime, minDeletionTime(cd));
+        }
+
+        return new BTreeRow(clustering, columns, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
+    }
+
+    public static BTreeRow emptyRow(Clustering clustering)
+    {
+        return new BTreeRow(clustering, Columns.NONE, LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.empty(), Integer.MAX_VALUE);
+    }
+
+    public static BTreeRow singleCellRow(Clustering clustering, Cell cell)
+    {
+        if (cell.column().isSimple())
+            return new BTreeRow(clustering, Columns.of(cell.column()), LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.singleton(cell), minDeletionTime(cell));
+
+        ComplexColumnData complexData = new ComplexColumnData(cell.column(), new Cell[]{ cell }, DeletionTime.LIVE);
+        return new BTreeRow(clustering, Columns.of(cell.column()), LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.singleton(complexData), minDeletionTime(cell));
+    }
+
+    public static BTreeRow emptyDeletedRow(Clustering clustering, DeletionTime deletion)
+    {
+        assert !deletion.isLive();
+        return new BTreeRow(clustering, Columns.NONE, LivenessInfo.EMPTY, deletion, BTree.empty(), Integer.MIN_VALUE);
+    }
+
+    public static BTreeRow noCellLiveRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo)
+    {
+        assert !primaryKeyLivenessInfo.isEmpty();
+        return new BTreeRow(clustering, Columns.NONE, primaryKeyLivenessInfo, DeletionTime.LIVE, BTree.empty(), minDeletionTime(primaryKeyLivenessInfo));
+    }
+
+    private static int minDeletionTime(Cell cell)
+    {
+        return cell.isTombstone() ? Integer.MIN_VALUE : cell.localDeletionTime();
+    }
+
+    private static int minDeletionTime(LivenessInfo info)
+    {
+        return info.isExpiring() ? info.localExpirationTime() : Integer.MAX_VALUE;
+    }
+
+    private static int minDeletionTime(DeletionTime dt)
+    {
+        return dt.isLive() ? Integer.MAX_VALUE : Integer.MIN_VALUE;
+    }
+
+    private static int minDeletionTime(ComplexColumnData cd)
+    {
+        int min = minDeletionTime(cd.complexDeletion());
+        for (Cell cell : cd)
+        {
+            min = Math.min(min, minDeletionTime(cell));
+            if (min == Integer.MIN_VALUE)
+                break;
+        }
+        return min;
+    }
+
+    private static int minDeletionTime(ColumnData cd)
+    {
+        return cd.column().isSimple() ? minDeletionTime((Cell) cd) : minDeletionTime((ComplexColumnData)cd);
+    }
+
+    private static int minDeletionTime(Object[] btree, LivenessInfo info, DeletionTime rowDeletion)
+    {
+        int min = Math.min(minDeletionTime(info), minDeletionTime(rowDeletion));
+        for (ColumnData cd : BTree.<ColumnData>iterable(btree))
+        {
+            min = Math.min(min, minDeletionTime(cd));
+            if (min == Integer.MIN_VALUE)
+                break;
+        }
+        return min;
+    }
+
+    public Clustering clustering()
+    {
+        return clustering;
+    }
+
+    public Columns columns()
+    {
+        return columns;
+    }
+
+    public LivenessInfo primaryKeyLivenessInfo()
+    {
+        return primaryKeyLivenessInfo;
+    }
+
+    public boolean isEmpty()
+    {
+        return primaryKeyLivenessInfo().isEmpty()
+               && deletion().isLive()
+               && BTree.isEmpty(btree);
+    }
+
+    public DeletionTime deletion()
+    {
+        return deletion;
+    }
+
+    public Cell getCell(ColumnDefinition c)
+    {
+        assert !c.isComplex();
+        return (Cell) BTree.<Object>find(btree, ColumnDefinition.asymmetricColumnDataComparator, c);
+    }
+
+    public Cell getCell(ColumnDefinition c, CellPath path)
+    {
+        assert c.isComplex();
+        ComplexColumnData cd = getComplexColumnData(c);
+        if (cd == null)
+            return null;
+        return cd.getCell(path);
+    }
+
+    public ComplexColumnData getComplexColumnData(ColumnDefinition c)
+    {
+        assert c.isComplex();
+        return (ComplexColumnData) BTree.<Object>find(btree, ColumnDefinition.asymmetricColumnDataComparator, c);
+    }
+
+    public Iterator<ColumnData> iterator()
+    {
+        return searchIterator();
+    }
+
+    public Iterable<Cell> cells()
+    {
+        return CellIterator::new;
+    }
+
+    public BTreeSearchIterator<ColumnDefinition, ColumnData> searchIterator()
+    {
+        return BTree.slice(btree, ColumnDefinition.asymmetricColumnDataComparator, BTree.Dir.ASC);
+    }
+
+    public Row filter(ColumnFilter filter, CFMetaData metadata)
+    {
+        return filter(filter, DeletionTime.LIVE, false, metadata);
+    }
+
+    public Row filter(ColumnFilter filter, DeletionTime activeDeletion, boolean setActiveDeletionToRow, CFMetaData metadata)
+    {
+        Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = metadata.getDroppedColumns();
+
+        if (filter.includesAllColumns() && (activeDeletion.isLive() || deletion.supersedes(activeDeletion)) && droppedColumns.isEmpty())
+            return this;
+
+        boolean mayHaveShadowed = activeDeletion.supersedes(deletion);
+
+        LivenessInfo newInfo = primaryKeyLivenessInfo;
+        DeletionTime newDeletion = deletion;
+        if (mayHaveShadowed)
+        {
+            if (activeDeletion.deletes(newInfo.timestamp()))
+                newInfo = LivenessInfo.EMPTY;
+            // note that mayHaveShadowed means the activeDeletion shadows the row deletion. So if don't have setActiveDeletionToRow,
+            // the row deletion is shadowed and we shouldn't return it.
+            newDeletion = setActiveDeletionToRow ? activeDeletion : DeletionTime.LIVE;
+        }
+
+        Columns columns = filter.fetchedColumns().columns(isStatic());
+        Predicate<ColumnDefinition> inclusionTester = columns.inOrderInclusionTester();
+        return transformAndFilter(newInfo, newDeletion, (cd) -> {
+
+            ColumnDefinition column = cd.column();
+            if (!inclusionTester.test(column))
+                return null;
+
+            CFMetaData.DroppedColumn dropped = droppedColumns.get(column.name.bytes);
+            if (column.isComplex())
+                return ((ComplexColumnData)cd).filter(filter, mayHaveShadowed ? activeDeletion : DeletionTime.LIVE, dropped);
+
+            Cell cell = (Cell)cd;
+            return (dropped == null || cell.timestamp() > dropped.droppedTime) && !(mayHaveShadowed && activeDeletion.deletes(cell))
+                   ? cell : null;
+        });
+    }
+
+    public boolean hasComplexDeletion()
+    {
+        // We start by the end cause we know complex columns sort before simple ones
+        for (ColumnData cd : BTree.<ColumnData>iterable(btree, BTree.Dir.DESC))
+        {
+            if (cd.column().isSimple())
+                return false;
+
+            if (!((ComplexColumnData)cd).complexDeletion().isLive())
+                return true;
+        }
+        return false;
+    }
+
+    public Row markCounterLocalToBeCleared()
+    {
+        return transformAndFilter(primaryKeyLivenessInfo, deletion, (cd) -> cd.column().cellValueType().isCounter()
+                                                                            ? cd.markCounterLocalToBeCleared()
+                                                                            : cd);
+    }
+
+    public boolean hasDeletion(int nowInSec)
+    {
+        return nowInSec >= minLocalDeletionTime;
+    }
+
+    /**
+     * Returns a copy of the row where all timestamps for live data have replaced by {@code newTimestamp} and
+     * all deletion timestamp by {@code newTimestamp - 1}.
+     *
+     * This exists for the Paxos path, see {@link PartitionUpdate#updateAllTimestamp} for additional details.
+     */
+    public Row updateAllTimestamp(long newTimestamp)
+    {
+        LivenessInfo newInfo = primaryKeyLivenessInfo.isEmpty() ? primaryKeyLivenessInfo : primaryKeyLivenessInfo.withUpdatedTimestamp(newTimestamp);
+        DeletionTime newDeletion = deletion.isLive() ? deletion : new DeletionTime(newTimestamp - 1, deletion.localDeletionTime());
+
+        return transformAndFilter(newInfo, newDeletion, (cd) -> cd.updateAllTimestamp(newTimestamp));
+    }
+
+    public Row purge(DeletionPurger purger, int nowInSec)
+    {
+        if (!hasDeletion(nowInSec))
+            return this;
+
+        LivenessInfo newInfo = purger.shouldPurge(primaryKeyLivenessInfo, nowInSec) ? LivenessInfo.EMPTY : primaryKeyLivenessInfo;
+        DeletionTime newDeletion = purger.shouldPurge(deletion) ? DeletionTime.LIVE : deletion;
+
+        return transformAndFilter(newInfo, newDeletion, (cd) -> cd.purge(purger, nowInSec));
+    }
+
+    private Row transformAndFilter(LivenessInfo info, DeletionTime deletion, Function<ColumnData, ColumnData> function)
+    {
+        Object[] transformed = BTree.transformAndFilter(btree, function);
+
+        if (btree == transformed && info == this.primaryKeyLivenessInfo && deletion == this.deletion)
+            return this;
+
+        if (info.isEmpty() && deletion.isLive() && BTree.isEmpty(transformed))
+            return null;
+
+        int minDeletionTime = minDeletionTime(transformed, info, deletion);
+        return new BTreeRow(clustering, columns, info, deletion, transformed, minDeletionTime);
+    }
+
+    public int dataSize()
+    {
+        int dataSize = clustering.dataSize()
+                     + primaryKeyLivenessInfo.dataSize()
+                     + deletion.dataSize();
+
+        for (ColumnData cd : this)
+            dataSize += cd.dataSize();
+        return dataSize;
+    }
+
+    public long unsharedHeapSizeExcludingData()
+    {
+        long heapSize = EMPTY_SIZE
+                      + clustering.unsharedHeapSizeExcludingData()
+                      + BTree.sizeOfStructureOnHeap(btree);
+
+        for (ColumnData cd : this)
+            heapSize += cd.unsharedHeapSizeExcludingData();
+        return heapSize;
+    }
+
+    public static Row.Builder sortedBuilder(Columns columns)
+    {
+        return new Builder(columns, true);
+    }
+
+    public static Row.Builder unsortedBuilder(Columns columns, int nowInSec)
+    {
+        return new Builder(columns, false, nowInSec);
+    }
+
+    // This is only used by PartitionUpdate.CounterMark but other uses should be avoided as much as possible as it breaks our general
+    // assumption that Row objects are immutable. This method should go away post-#6506 in particular.
+    // This method is in particular not exposed by the Row API on purpose.
+    // This method also *assumes* that the cell we're setting already exists.
+    public void setValue(ColumnDefinition column, CellPath path, ByteBuffer value)
+    {
+        ColumnData current = (ColumnData) BTree.<Object>find(btree, ColumnDefinition.asymmetricColumnDataComparator, column);
+        if (column.isSimple())
+            BTree.replaceInSitu(btree, ColumnData.comparator, current, ((Cell) current).withUpdatedValue(value));
+        else
+            ((ComplexColumnData) current).setValue(path, value);
+    }
+
+    public Iterable<Cell> cellsInLegacyOrder(CFMetaData metadata)
+    {
+        return () -> new CellInLegacyOrderIterator(metadata);
+    }
+
+    private class CellIterator extends AbstractIterator<Cell>
+    {
+        private Iterator<ColumnData> columnData = iterator();
+        private Iterator<Cell> complexCells;
+
+        protected Cell computeNext()
+        {
+            while (true)
+            {
+                if (complexCells != null)
+                {
+                    if (complexCells.hasNext())
+                        return complexCells.next();
+
+                    complexCells = null;
+                }
+
+                if (!columnData.hasNext())
+                    return endOfData();
+
+                ColumnData cd = columnData.next();
+                if (cd.column().isComplex())
+                    complexCells = ((ComplexColumnData)cd).iterator();
+                else
+                    return (Cell)cd;
+            }
+        }
+    }
+
+    private class CellInLegacyOrderIterator extends AbstractIterator<Cell>
+    {
+        private final AbstractType<?> comparator;
+        private final int firstComplexIdx;
+        private int simpleIdx;
+        private int complexIdx;
+        private Iterator<Cell> complexCells;
+        private final Object[] data;
+
+        private CellInLegacyOrderIterator(CFMetaData metadata)
+        {
+            this.comparator = metadata.getColumnDefinitionNameComparator(isStatic() ? ColumnDefinition.Kind.STATIC : ColumnDefinition.Kind.REGULAR);
+
+            // copy btree into array for simple separate iteration of simple and complex columns
+            this.data = new Object[BTree.size(btree)];
+            BTree.toArray(btree, data, 0);
+
+            int idx = Iterators.indexOf(Iterators.forArray(data), cd -> cd instanceof ComplexColumnData);
+            this.firstComplexIdx = idx < 0 ? data.length : idx;
+            this.complexIdx = firstComplexIdx;
+        }
+
+        protected Cell computeNext()
+        {
+            while (true)
+            {
+                if (complexCells != null)
+                {
+                    if (complexCells.hasNext())
+                        return complexCells.next();
+
+                    complexCells = null;
+                }
+
+                if (simpleIdx >= firstComplexIdx)
+                {
+                    if (complexIdx >= data.length)
+                        return endOfData();
+
+                    complexCells = ((ComplexColumnData)data[complexIdx++]).iterator();
+                }
+                else
+                {
+                    if (complexIdx >= data.length)
+                        return (Cell)data[simpleIdx++];
+
+                    if (comparator.compare(((ColumnData) data[simpleIdx]).column().name.bytes, ((ColumnData) data[complexIdx]).column().name.bytes) < 0)
+                        return (Cell)data[simpleIdx++];
+                    else
+                        complexCells = ((ComplexColumnData)data[complexIdx++]).iterator();
+                }
+            }
+        }
+    }
+
+    public static class Builder implements Row.Builder
+    {
+        // a simple marker class that will sort to the beginning of a run of complex cells to store the deletion time
+        private static class ComplexColumnDeletion extends BufferCell
+        {
+            public ComplexColumnDeletion(ColumnDefinition column, DeletionTime deletionTime)
+            {
+                super(column, deletionTime.markedForDeleteAt(), 0, deletionTime.localDeletionTime(), ByteBufferUtil.EMPTY_BYTE_BUFFER, CellPath.BOTTOM);
+            }
+        }
+
+        // converts a run of Cell with equal column into a ColumnData
+        private static class CellResolver implements BTree.Builder.Resolver
+        {
+            final int nowInSec;
+            private CellResolver(int nowInSec)
+            {
+                this.nowInSec = nowInSec;
+            }
+
+            public ColumnData resolve(Object[] cells, int lb, int ub)
+            {
+                Cell cell = (Cell) cells[lb];
+                ColumnDefinition column = cell.column;
+                if (cell.column.isSimple())
+                {
+                    assert lb + 1 == ub || nowInSec != Integer.MIN_VALUE;
+                    while (++lb < ub)
+                        cell = Cells.reconcile(cell, (Cell) cells[lb], nowInSec);
+                    return cell;
+                }
+
+                // TODO: relax this in the case our outer provider is sorted (want to delay until remaining changes are
+                // bedded in, as less important; galloping makes it pretty cheap anyway)
+                Arrays.sort(cells, lb, ub, (Comparator<Object>) column.cellComparator());
+                cell = (Cell) cells[lb];
+                DeletionTime deletion = DeletionTime.LIVE;
+                if (cell instanceof ComplexColumnDeletion)
+                {
+                    // TODO: do we need to be robust to multiple of these being provided?
+                    deletion = new DeletionTime(cell.timestamp(), cell.localDeletionTime());
+                    lb++;
+                }
+
+                List<Object> buildFrom = Arrays.asList(cells).subList(lb, ub);
+                Object[] btree = BTree.build(buildFrom, UpdateFunction.noOp());
+                return new ComplexColumnData(column, btree, deletion);
+            }
+
+        };
+        protected final Columns columns;
+
+        protected Clustering clustering;
+        protected LivenessInfo primaryKeyLivenessInfo = LivenessInfo.EMPTY;
+        protected DeletionTime deletion = DeletionTime.LIVE;
+
+        private final boolean isSorted;
+        private final BTree.Builder<Cell> cells;
+        private final CellResolver resolver;
+        private boolean hasComplex = false;
+
+        // For complex column at index i of 'columns', we store at complexDeletions[i] its complex deletion.
+
+        protected Builder(Columns columns, boolean isSorted)
+        {
+            this(columns, isSorted, Integer.MIN_VALUE);
+        }
+
+        protected Builder(Columns columns, boolean isSorted, int nowInSecs)
+        {
+            this.columns = columns;
+            this.cells = BTree.builder(ColumnData.comparator);
+            resolver = new CellResolver(nowInSecs);
+            this.isSorted = isSorted;
+            this.cells.auto(false);
+        }
+
+        public boolean isSorted()
+        {
+            return isSorted;
+        }
+
+        public void newRow(Clustering clustering)
+        {
+            assert this.clustering == null; // Ensures we've properly called build() if we've use this builder before
+            this.clustering = clustering;
+        }
+
+        public Clustering clustering()
+        {
+            return clustering;
+        }
+
+        protected void reset()
+        {
+            this.clustering = null;
+            this.primaryKeyLivenessInfo = LivenessInfo.EMPTY;
+            this.deletion = DeletionTime.LIVE;
+            this.cells.reuse();
+        }
+
+        public void addPrimaryKeyLivenessInfo(LivenessInfo info)
+        {
+            this.primaryKeyLivenessInfo = info;
+        }
+
+        public void addRowDeletion(DeletionTime deletion)
+        {
+            this.deletion = deletion;
+        }
+
+        public void addCell(Cell cell)
+        {
+            assert cell.column().isStatic() == (clustering == Clustering.STATIC_CLUSTERING) : "Column is " + cell.column() + ", clustering = " + clustering;
+            cells.add(cell);
+            hasComplex |= cell.column.isComplex();
+        }
+
+        public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion)
+        {
+            cells.add(new ComplexColumnDeletion(column, complexDeletion));
+            hasComplex = true;
+        }
+
+        public Row build()
+        {
+            if (!isSorted)
+                cells.sort();
+            // we can avoid resolving if we're sorted and have no complex values
+            // (because we'll only have unique simple cells, which are already in their final condition)
+            if (!isSorted | hasComplex)
+                cells.resolve(resolver);
+            Object[] btree = cells.build();
+            int minDeletionTime = minDeletionTime(btree, primaryKeyLivenessInfo, deletion);
+            Row row = new BTreeRow(clustering, columns, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
+            reset();
+            return row;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/EncodingStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/EncodingStats.java b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
index ca62c47..efa40ad 100644
--- a/src/java/org/apache/cassandra/db/rows/EncodingStats.java
+++ b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
@@ -240,6 +240,17 @@ public class EncodingStats
                                      isTTLSet ? minTTL : TTL_EPOCH,
                                      isColumnSetPerRowSet ? (rows == 0 ? 0 : (int)(totalColumnsSet / rows)) : -1);
         }
+
+        public static EncodingStats collect(Row staticRow, Iterator<Row> rows, DeletionInfo deletionInfo)
+        {
+            Collector collector = new Collector();
+            deletionInfo.collectStats(collector);
+            if (!staticRow.isEmpty())
+                Rows.collectStats(staticRow, collector);
+            while (rows.hasNext())
+                Rows.collectStats(rows.next(), collector);
+            return collector.get();
+        }
     }
 
     public static class Serializer

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
index 33ad447..996e89a 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -395,7 +395,7 @@ public interface Row extends Unfiltered, Iterable<ColumnData>
             // Because some data might have been shadowed by the 'activeDeletion', we could have an empty row
             return rowInfo.isEmpty() && rowDeletion.isLive() && dataBuffer.isEmpty()
                  ? null
-                 : BTreeBackedRow.create(clustering, columns, rowInfo, rowDeletion, BTree.build(dataBuffer, UpdateFunction.<ColumnData>noOp()));
+                 : BTreeRow.create(clustering, columns, rowInfo, rowDeletion, BTree.build(dataBuffer, UpdateFunction.<ColumnData>noOp()));
         }
 
         public Clustering mergedClustering()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/RowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowIterators.java b/src/java/org/apache/cassandra/db/rows/RowIterators.java
index 766cf19..30f5c50 100644
--- a/src/java/org/apache/cassandra/db/rows/RowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/RowIterators.java
@@ -49,7 +49,12 @@ public abstract class RowIterators
             iterator.next().digest(digest);
     }
 
-    public static RowIterator emptyIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final boolean isReverseOrder)
+    public static RowIterator emptyIterator(CFMetaData cfm, DecoratedKey partitionKey, boolean isReverseOrder)
+    {
+        return iterator(cfm, partitionKey, isReverseOrder, Collections.emptyIterator());
+    }
+
+    public static RowIterator iterator(CFMetaData cfm, DecoratedKey partitionKey, boolean isReverseOrder, Iterator<Row> iterator)
     {
         return new RowIterator()
         {
@@ -78,23 +83,16 @@ public abstract class RowIterators
                 return Rows.EMPTY_STATIC_ROW;
             }
 
+            public void close() { }
+
             public boolean hasNext()
             {
-                return false;
+                return iterator.hasNext();
             }
 
             public Row next()
             {
-                throw new NoSuchElementException();
-            }
-
-            public void remove()
-            {
-                throw new UnsupportedOperationException();
-            }
-
-            public void close()
-            {
+                return iterator.next();
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/Rows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java
index bacd591..0b739a8 100644
--- a/src/java/org/apache/cassandra/db/rows/Rows.java
+++ b/src/java/org/apache/cassandra/db/rows/Rows.java
@@ -49,7 +49,7 @@ public abstract class Rows
 
     private Rows() {}
 
-    public static final Row EMPTY_STATIC_ROW = BTreeBackedRow.emptyRow(Clustering.STATIC_CLUSTERING);
+    public static final Row EMPTY_STATIC_ROW = BTreeRow.emptyRow(Clustering.STATIC_CLUSTERING);
 
     public static Row.Builder copy(Row row, Row.Builder builder)
     {
@@ -217,7 +217,7 @@ public abstract class Rows
     public static Row merge(Row row1, Row row2, int nowInSec)
     {
         Columns mergedColumns = row1.columns().mergeTo(row2.columns());
-        Row.Builder builder = BTreeBackedRow.sortedBuilder(mergedColumns);
+        Row.Builder builder = BTreeRow.sortedBuilder(mergedColumns);
         merge(row1, row2, mergedColumns, builder, nowInSec, SecondaryIndexManager.nullUpdater);
         return builder.build();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index f17ccca..2102534 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -204,7 +204,7 @@ public class UnfilteredRowIteratorSerializer
         final SerializationHeader sHeader = header.sHeader;
         return new AbstractUnfilteredRowIterator(metadata, header.key, header.partitionDeletion, sHeader.columns(), header.staticRow, header.isReversed, sHeader.stats())
         {
-            private final Row.Builder builder = BTreeBackedRow.sortedBuilder(sHeader.columns().regulars);
+            private final Row.Builder builder = BTreeRow.sortedBuilder(sHeader.columns().regulars);
 
             protected Unfiltered computeNext()
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 650a18d..60f0dcb 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -89,10 +89,14 @@ public abstract class UnfilteredRowIterators
         return UnfilteredRowMergeIterator.create(iterators, nowInSec, mergeListener);
     }
 
+    public static UnfilteredRowIterator emptyIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final boolean isReverseOrder)
+    {
+        return noRowsIterator(cfm, partitionKey, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, isReverseOrder);
+    }
     /**
      * Returns an empty atom iterator for a given partition.
      */
-    public static UnfilteredRowIterator emptyIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final boolean isReverseOrder)
+    public static UnfilteredRowIterator noRowsIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final Row staticRow, final DeletionTime partitionDeletion, final boolean isReverseOrder)
     {
         return new UnfilteredRowIterator()
         {
@@ -118,12 +122,12 @@ public abstract class UnfilteredRowIterators
 
             public DeletionTime partitionLevelDeletion()
             {
-                return DeletionTime.LIVE;
+                return partitionDeletion;
             }
 
             public Row staticRow()
             {
-                return Rows.EMPTY_STATIC_ROW;
+                return staticRow;
             }
 
             public EncodingStats stats()
@@ -225,7 +229,7 @@ public abstract class UnfilteredRowIterators
             @Override
             protected Row computeNextStatic(Row row)
             {
-                Row.Builder staticBuilder = allocator.cloningArrayBackedRowBuilder(columns().statics);
+                Row.Builder staticBuilder = allocator.cloningBTreeRowBuilder(columns().statics);
                 return Rows.copy(row, staticBuilder).build();
             }
 
@@ -233,7 +237,7 @@ public abstract class UnfilteredRowIterators
             protected Row computeNext(Row row)
             {
                 if (regularBuilder == null)
-                    regularBuilder = allocator.cloningArrayBackedRowBuilder(columns().regulars);
+                    regularBuilder = allocator.cloningBTreeRowBuilder(columns().regulars);
 
                 return Rows.copy(row, regularBuilder).build();
             }
@@ -541,7 +545,7 @@ public abstract class UnfilteredRowIterators
                 {
                     Row merged = rowMerger.merge(markerMerger.activeDeletion());
                     if (listener != null)
-                        listener.onMergedRows(merged == null ? BTreeBackedRow.emptyRow(rowMerger.mergedClustering()) : merged, columns().regulars, rowMerger.mergedRows());
+                        listener.onMergedRows(merged == null ? BTreeRow.emptyRow(rowMerger.mergedClustering()) : merged, columns().regulars, rowMerger.mergedRows());
                     return merged;
                 }
                 else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index f306e6d..14b06cf 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -341,7 +341,7 @@ public class UnfilteredSerializer
     {
         int flags = in.readUnsignedByte();
         assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW && isStatic(flags) : flags;
-        Row.Builder builder = BTreeBackedRow.sortedBuilder(helper.fetchedStaticColumns(header));
+        Row.Builder builder = BTreeRow.sortedBuilder(helper.fetchedStaticColumns(header));
         builder.newRow(Clustering.STATIC_CLUSTERING);
         return deserializeRowBody(in, header, helper, flags, builder);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/view/MaterializedView.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedView.java b/src/java/org/apache/cassandra/db/view/MaterializedView.java
index 39b2769..7337e4b 100644
--- a/src/java/org/apache/cassandra/db/view/MaterializedView.java
+++ b/src/java/org/apache/cassandra/db/view/MaterializedView.java
@@ -52,10 +52,10 @@ import org.apache.cassandra.db.ReadOrderGroup;
 import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.db.Slice;
 import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.partitions.AbstractThreadUnsafePartition;
+import org.apache.cassandra.db.partitions.AbstractBTreePartition;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.rows.BTreeBackedRow;
+import org.apache.cassandra.db.rows.BTreeRow;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.ColumnData;
 import org.apache.cassandra.db.rows.ComplexColumnData;
@@ -208,7 +208,7 @@ public class MaterializedView
      *
      * @return true if {@param partition} modifies a column included in the view
      */
-    public boolean updateAffectsView(AbstractThreadUnsafePartition partition)
+    public boolean updateAffectsView(AbstractBTreePartition partition)
     {
         // If we are including all of the columns, then any update will be included
         if (includeAll)
@@ -268,7 +268,7 @@ public class MaterializedView
                                             int nowInSec)
     {
         CFMetaData viewCfm = getViewCfs().metadata;
-        Row.Builder builder = BTreeBackedRow.unsortedBuilder(viewCfm.partitionColumns().regulars, nowInSec);
+        Row.Builder builder = BTreeRow.unsortedBuilder(viewCfm.partitionColumns().regulars, nowInSec);
         builder.newRow(viewClustering(temporalRow, resolver));
         builder.addRowDeletion(deletionTime);
         return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build());
@@ -286,7 +286,7 @@ public class MaterializedView
     {
 
         CFMetaData viewCfm = getViewCfs().metadata;
-        Row.Builder builder = BTreeBackedRow.unsortedBuilder(viewCfm.partitionColumns().regulars, nowInSec);
+        Row.Builder builder = BTreeRow.unsortedBuilder(viewCfm.partitionColumns().regulars, nowInSec);
         builder.newRow(viewClustering(temporalRow, resolver));
         builder.addComplexDeletion(deletedColumn, deletionTime);
         return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build());
@@ -363,7 +363,7 @@ public class MaterializedView
             return null;
         }
 
-        Row.Builder regularBuilder = BTreeBackedRow.unsortedBuilder(viewCfs.metadata.partitionColumns().regulars, temporalRow.nowInSec);
+        Row.Builder regularBuilder = BTreeRow.unsortedBuilder(viewCfs.metadata.partitionColumns().regulars, temporalRow.nowInSec);
 
         CBuilder clustering = CBuilder.create(viewCfs.getComparator());
         for (int i = 0; i < viewCfs.metadata.clusteringColumns().size(); i++)
@@ -395,7 +395,7 @@ public class MaterializedView
      * @return    View Tombstones which delete all of the rows which have been removed from the base table with
      *            {@param partition}
      */
-    private Collection<Mutation> createForDeletionInfo(TemporalRow.Set rowSet, AbstractThreadUnsafePartition partition)
+    private Collection<Mutation> createForDeletionInfo(TemporalRow.Set rowSet, AbstractBTreePartition partition)
     {
         final TemporalRow.Resolver resolver = TemporalRow.earliest;
 
@@ -558,7 +558,7 @@ public class MaterializedView
     /**
      * @return Set of rows which are contained in the partition update {@param partition}
      */
-    private TemporalRow.Set separateRows(ByteBuffer key, AbstractThreadUnsafePartition partition)
+    private TemporalRow.Set separateRows(ByteBuffer key, AbstractBTreePartition partition)
     {
         Set<ColumnIdentifier> columns = new HashSet<>();
         for (ColumnDefinition def : this.columns.primaryKeyDefs)
@@ -578,7 +578,7 @@ public class MaterializedView
      *         have been applied successfully. This is based solely on the changes that are necessary given the current
      *         state of the base table and the newly applying partition data.
      */
-    public Collection<Mutation> createMutations(ByteBuffer key, AbstractThreadUnsafePartition partition, boolean isBuilding)
+    public Collection<Mutation> createMutations(ByteBuffer key, AbstractBTreePartition partition, boolean isBuilding)
     {
         if (!updateAffectsView(partition))
             return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/view/TemporalRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java b/src/java/org/apache/cassandra/db/view/TemporalRow.java
index d0ba5ea..00fdf48 100644
--- a/src/java/org/apache/cassandra/db/view/TemporalRow.java
+++ b/src/java/org/apache/cassandra/db/view/TemporalRow.java
@@ -45,7 +45,7 @@ import org.apache.cassandra.db.LivenessInfo;
 import org.apache.cassandra.db.RangeTombstone;
 import org.apache.cassandra.db.Slice;
 import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.db.partitions.AbstractThreadUnsafePartition;
+import org.apache.cassandra.db.partitions.AbstractBTreePartition;
 import org.apache.cassandra.db.rows.BufferCell;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.CellPath;
@@ -309,7 +309,7 @@ public class TemporalRow
         return null;
     }
 
-    public DeletionTime deletionTime(AbstractThreadUnsafePartition partition)
+    public DeletionTime deletionTime(AbstractBTreePartition partition)
     {
         DeletionInfo deletionInfo = partition.deletionInfo();
         if (!deletionInfo.getPartitionDeletion().isLive())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
index 65a5259..7382e5e 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
@@ -71,7 +71,7 @@ public abstract class SSTableSimpleIterator extends AbstractIterator<Unfiltered>
         {
             super(metadata, in, helper);
             this.header = header;
-            this.builder = BTreeBackedRow.sortedBuilder(helper.fetchedRegularColumns(header));
+            this.builder = BTreeRow.sortedBuilder(helper.fetchedRegularColumns(header));
         }
 
         public Row readStaticRow() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index c9d9fa5..9213b20 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
@@ -35,7 +34,6 @@ import javax.management.ObjectName;
 import com.google.common.util.concurrent.Futures;
 
 import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,7 +48,7 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.partitions.ArrayBackedCachedPartition;
+import org.apache.cassandra.db.partitions.CachedBTreePartition;
 import org.apache.cassandra.db.partitions.CachedPartition;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -431,7 +429,7 @@ public class CacheService implements CacheServiceMBean
                     int nowInSec = FBUtilities.nowInSeconds();
                     try (OpOrder.Group op = cfs.readOrdering.start(); UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, nowInSec, key).queryMemtableAndDisk(cfs, op))
                     {
-                        CachedPartition toCache = ArrayBackedCachedPartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec), nowInSec);
+                        CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec), nowInSec);
                         return Pair.create(new RowCacheKey(cfs.metadata.cfId, key), (IRowCacheEntry)toCache);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 6bfe94a..3dc323e 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -214,7 +214,7 @@ public class DataResolver extends ResponseResolver
             {
                 if (currentRows[i] == null)
                 {
-                    currentRows[i] = BTreeBackedRow.sortedBuilder(clustering == Clustering.STATIC_CLUSTERING ? columns.statics : columns.regulars);
+                    currentRows[i] = BTreeRow.sortedBuilder(clustering == Clustering.STATIC_CLUSTERING ? columns.statics : columns.regulars);
                     currentRows[i].newRow(clustering);
                 }
                 return currentRows[i];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 733067e..cb0667a 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -824,7 +824,7 @@ public class CassandraServer implements Cassandra.Iface
         {
             LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(metadata, column_parent.super_column, column.name);
             Cell cell = cellFromColumn(metadata, name, column);
-            PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, key, BTreeBackedRow.singleCellRow(name.clustering, cell));
+            PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, key, BTreeRow.singleCellRow(name.clustering, cell));
 
             mutation = new org.apache.cassandra.db.Mutation(update);
         }
@@ -1329,7 +1329,7 @@ public class CassandraServer implements Cassandra.Iface
         }
         else if (column_path.super_column != null && column_path.column == null)
         {
-            Row row = BTreeBackedRow.emptyDeletedRow(new Clustering(column_path.super_column), new DeletionTime(timestamp, nowInSec));
+            Row row = BTreeRow.emptyDeletedRow(new Clustering(column_path.super_column), new DeletionTime(timestamp, nowInSec));
             update = PartitionUpdate.singleRowUpdate(metadata, dk, row);
         }
         else
@@ -1339,7 +1339,7 @@ public class CassandraServer implements Cassandra.Iface
                 LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(metadata, column_path.super_column, column_path.column);
                 CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement);
                 Cell cell = BufferCell.tombstone(name.column, timestamp, nowInSec, path);
-                update = PartitionUpdate.singleRowUpdate(metadata, dk, BTreeBackedRow.singleCellRow(name.clustering, cell));
+                update = PartitionUpdate.singleRowUpdate(metadata, dk, BTreeRow.singleCellRow(name.clustering, cell));
             }
             catch (UnknownColumnException e)
             {
@@ -2138,7 +2138,7 @@ public class CassandraServer implements Cassandra.Iface
                 CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement);
                 Cell cell = BufferCell.live(metadata, name.column, FBUtilities.timestampMicros(), value, path);
 
-                PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, key, BTreeBackedRow.singleCellRow(name.clustering, cell));
+                PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, key, BTreeRow.singleCellRow(name.clustering, cell));
 
                 org.apache.cassandra.db.Mutation mutation = new org.apache.cassandra.db.Mutation(update);
                 doInsert(consistency_level, Arrays.asList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level))));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java b/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
index 09e8d4b..084e835 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
@@ -112,7 +112,7 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator
             super(results);
             assert results.metadata().isStaticCompactTable();
             this.nowInSec = nowInSec;
-            this.builder = BTreeBackedRow.sortedBuilder(results.columns().regulars);
+            this.builder = BTreeRow.sortedBuilder(results.columns().regulars);
         }
 
         private void init()
@@ -220,7 +220,7 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator
             this.superColumnMapColumn = results.metadata().compactValueColumn();
             assert superColumnMapColumn != null && superColumnMapColumn.type instanceof MapType;
 
-            this.builder = BTreeBackedRow.sortedBuilder(Columns.of(superColumnMapColumn));
+            this.builder = BTreeRow.sortedBuilder(Columns.of(superColumnMapColumn));
             this.columnComparator = ((MapType)superColumnMapColumn.type).nameComparator();
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
index d75d2f1..40d4094 100644
--- a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
+++ b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
@@ -156,7 +156,7 @@ public class TriggerExecutor
         return merged;
     }
 
-    private Collection<PartitionUpdate> validateForSinglePartition(UUID cfId,
+    private List<PartitionUpdate> validateForSinglePartition(UUID cfId,
                                                                    DecoratedKey key,
                                                                    Collection<Mutation> tmutations)
     throws InvalidRequestException
@@ -165,7 +165,7 @@ public class TriggerExecutor
 
         if (tmutations.size() == 1)
         {
-            Collection<PartitionUpdate> updates = Iterables.getOnlyElement(tmutations).getPartitionUpdates();
+            List<PartitionUpdate> updates = Lists.newArrayList(Iterables.getOnlyElement(tmutations).getPartitionUpdates());
             if (updates.size() > 1)
                 throw new InvalidRequestException("The updates generated by triggers are not all for the same partition");
             validateSamePartition(cfId, key, Iterables.getOnlyElement(updates));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/utils/btree/BTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java b/src/java/org/apache/cassandra/utils/btree/BTree.java
index 353e7a5..fe08011 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTree.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTree.java
@@ -183,7 +183,7 @@ public class BTree
         return btree;
     }
 
-    public static <K> Object[] merge(Object[] tree1, Object[] tree2, Comparator<K> comparator)
+    public static <K> Object[] merge(Object[] tree1, Object[] tree2, Comparator<? super K> comparator, UpdateFunction<K, K> updateF)
     {
         if (size(tree1) < size(tree2))
         {
@@ -191,7 +191,7 @@ public class BTree
             tree1 = tree2;
             tree2 = tmp;
         }
-        return update(tree1, comparator, new BTreeSet<K>(tree2, comparator), UpdateFunction.<K>noOp());
+        return update(tree1, comparator, new BTreeSet<K>(tree2, comparator), updateF);
     }
 
     public static <V> Iterator<V> iterator(Object[] btree)
@@ -749,8 +749,15 @@ public class BTree
         return new Builder<>(comparator);
     }
 
+    public static <V> Builder<V> builder(Comparator<? super V> comparator, int initialCapacity)
+    {
+        return new Builder<>(comparator);
+    }
+
     public static class Builder<V>
     {
+
+        // a user-defined bulk resolution, to be applied manually via resolve()
         public static interface Resolver
         {
             // can return a different output type to input, so long as sort order is maintained
@@ -759,15 +766,37 @@ public class BTree
             Object resolve(Object[] array, int lb, int ub);
         }
 
+        // a user-defined resolver that is applied automatically on encountering two duplicate values
+        public static interface QuickResolver<V>
+        {
+            // can return a different output type to input, so long as sort order is maintained
+            // if a resolver is present, this method will be called for every sequence of equal inputs
+            // even those with only one item
+            V resolve(V a, V b);
+        }
+
         Comparator<? super V> comparator;
-        Object[] values = new Object[10];
+        Object[] values;
         int count;
-        boolean detected; // true if we have managed to cheaply ensure sorted (+ filtered, if resolver == null) as we have added
+        boolean detected = true; // true if we have managed to cheaply ensure sorted (+ filtered, if resolver == null) as we have added
         boolean auto = true; // false if the user has promised to enforce the sort order and resolve any duplicates
+        QuickResolver<V> quickResolver;
 
         protected Builder(Comparator<? super V> comparator)
         {
+            this(comparator, 16);
+        }
+
+        protected Builder(Comparator<? super V> comparator, int initialCapacity)
+        {
             this.comparator = comparator;
+            this.values = new Object[initialCapacity];
+        }
+
+        public Builder<V> setQuickResolver(QuickResolver<V> quickResolver)
+        {
+            this.quickResolver = quickResolver;
+            return this;
         }
 
         public void reuse()
@@ -792,14 +821,20 @@ public class BTree
         {
             if (count == values.length)
                 values = Arrays.copyOf(values, count * 2);
-            values[count++] = v;
 
-            if (auto && detected && count > 1)
+            Object[] values = this.values;
+            int prevCount = this.count++;
+            values[prevCount] = v;
+
+            if (auto && detected && prevCount > 0)
             {
-                int c = comparator.compare((V) values[count - 2], (V) values[count - 1]);
+                V prev = (V) values[prevCount - 1];
+                int c = comparator.compare(prev, v);
                 if (c == 0 && auto)
                 {
-                    count--;
+                    count = prevCount;
+                    if (quickResolver != null)
+                        values[prevCount - 1] = quickResolver.resolve(prev, v);
                 }
                 else if (c > 0)
                 {
@@ -881,7 +916,11 @@ public class BTree
                 if (c > 0)
                     break;
                 else if (c == 0)
+                {
+                    if (quickResolver != null)
+                        a[i] = quickResolver.resolve(ai, aj);
                     j++;
+                }
                 i++;
             }
 
@@ -896,11 +935,14 @@ public class BTree
 
             while (i < curEnd && j < addEnd)
             {
+                V ai = (V) a[i];
+                V aj = (V) a[j];
                 // could avoid one comparison if we cared, but would make this ugly
-                int c = comparator.compare((V) a[i], (V) a[j]);
+                int c = comparator.compare(ai, aj);
                 if (c == 0)
                 {
-                    a[newCount++] = a[i];
+                    Object newValue = quickResolver == null ? ai : quickResolver.resolve(ai, aj);
+                    a[newCount++] = newValue;
                     i++;
                     j++;
                 }
@@ -931,6 +973,19 @@ public class BTree
             return count == 0;
         }
 
+        public Builder<V> reverse()
+        {
+            assert !auto;
+            int mid = count / 2;
+            for (int i = 0 ; i < mid ; i++)
+            {
+                Object t = values[i];
+                values[i] = values[count - (1 + i)];
+                values[count - (1 + i)] = t;
+            }
+            return this;
+        }
+
         public Builder<V> sort()
         {
             Arrays.sort((V[]) values, 0, count, comparator);
@@ -943,11 +998,17 @@ public class BTree
             if (!detected && count > 1)
             {
                 sort();
-                int c = 1;
+                int prevIdx = 0;
+                V prev = (V) values[0];
                 for (int i = 1 ; i < count ; i++)
-                    if (comparator.compare((V) values[i], (V) values[i - 1]) != 0)
-                        values[c++] = values[i];
-                count = c;
+                {
+                    V next = (V) values[i];
+                    if (comparator.compare(prev, next) != 0)
+                        values[++prevIdx] = prev = next;
+                    else if (quickResolver != null)
+                        values[prevIdx] = prev = quickResolver.resolve(prev, next);
+                }
+                count = prevIdx + 1;
             }
             detected = true;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java b/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java
index 9a7fa1b..0ab10c2 100644
--- a/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java
+++ b/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java
@@ -18,6 +18,8 @@
  */
 package org.apache.cassandra.utils.btree;
 
+import java.util.function.BiFunction;
+
 import com.google.common.base.Function;
 /**
  * An interface defining a function to be applied to both the object we are replacing in a BTree and
@@ -42,27 +44,26 @@ public interface UpdateFunction<K, V> extends Function<K, V>
      */
     void allocated(long heapSize);
 
-    static final UpdateFunction<Object, Object> noOp = new UpdateFunction<Object, Object>()
+    public static final class Simple<V> implements UpdateFunction<V, V>
     {
-        public Object apply(Object replacing, Object updating)
+        private final BiFunction<V, V, V> wrapped;
+        public Simple(BiFunction<V, V, V> wrapped)
         {
-            return updating;
+            this.wrapped = wrapped;
         }
 
-        public boolean abortEarly()
-        {
-            return false;
-        }
+        public V apply(V v) { return v; }
+        public V apply(V replacing, V update) { return wrapped.apply(replacing, update); }
+        public boolean abortEarly() { return false; }
+        public void allocated(long heapSize) { }
 
-        public void allocated(long heapSize)
+        public static <V> Simple<V> of(BiFunction<V, V, V> f)
         {
+            return new Simple<>(f);
         }
+    }
 
-        public Object apply(Object k)
-        {
-            return k;
-        }
-    };
+    static final Simple<Object> noOp = Simple.of((a, b) -> a);
 
     public static <K> UpdateFunction<K, K> noOp()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java b/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
index 8fd470f..a76732f 100644
--- a/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer;
 
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.Columns;
-import org.apache.cassandra.db.rows.BTreeBackedRow;
+import org.apache.cassandra.db.rows.BTreeRow;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -46,16 +46,16 @@ public abstract class AbstractAllocator
 
     public abstract ByteBuffer allocate(int size);
 
-    public Row.Builder cloningArrayBackedRowBuilder(Columns columns)
+    public Row.Builder cloningBTreeRowBuilder(Columns columns)
     {
-        return new CloningArrayBackedRowBuilder(columns, this);
+        return new CloningBTreeRowBuilder(columns, this);
     }
 
-    private static class CloningArrayBackedRowBuilder extends BTreeBackedRow.Builder
+    private static class CloningBTreeRowBuilder extends BTreeRow.Builder
     {
         private final AbstractAllocator allocator;
 
-        private CloningArrayBackedRowBuilder(Columns columns, AbstractAllocator allocator)
+        private CloningBTreeRowBuilder(Columns columns, AbstractAllocator allocator)
         {
             super(columns, true);
             this.allocator = allocator;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java b/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
index 31df444..8205f3b 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.utils.memory;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -35,7 +34,7 @@ public abstract class MemtableBufferAllocator extends MemtableAllocator
     public Row.Builder rowBuilder(CFMetaData metadata, OpOrder.Group writeOp, boolean isStatic)
     {
         Columns columns = isStatic ? metadata.partitionColumns().statics : metadata.partitionColumns().regulars;
-        return allocator(writeOp).cloningArrayBackedRowBuilder(columns);
+        return allocator(writeOp).cloningBTreeRowBuilder(columns);
     }
 
     public DecoratedKey clone(DecoratedKey key, OpOrder.Group writeOp)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
----------------------------------------------------------------------
diff --git a/test/burn/org/apache/cassandra/utils/LongBTreeTest.java b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
index 37866d2..0e8c467 100644
--- a/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
+++ b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
@@ -18,6 +18,9 @@
  */
 package org.apache.cassandra.utils;
 
+import java.lang.annotation.Annotation;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -54,15 +57,16 @@ import static org.junit.Assert.assertTrue;
 public class LongBTreeTest
 {
 
+    private static final boolean DEBUG = false;
     private static int perThreadTrees = 10000;
-    private static int minTreeSize = 5;
-    private static int maxTreeSize = 15;
-    private static final boolean DEBUG = true;
+    private static int minTreeSize = 4;
+    private static int maxTreeSize = 10000;
+    private static int threads = DEBUG ? 1 : Runtime.getRuntime().availableProcessors() * 8;
     private static final MetricRegistry metrics = new MetricRegistry();
     private static final Timer BTREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "BTREE"));
     private static final Timer TREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "TREE"));
-    private static final ExecutorService MODIFY = Executors.newFixedThreadPool(DEBUG ? 1 : Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("MODIFY"));
-    private static final ExecutorService COMPARE = DEBUG ? MODIFY : Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("COMPARE"));
+    private static final ExecutorService MODIFY = Executors.newFixedThreadPool(threads, new NamedThreadFactory("MODIFY"));
+    private static final ExecutorService COMPARE = DEBUG ? MODIFY : Executors.newFixedThreadPool(threads, new NamedThreadFactory("COMPARE"));
     private static final RandomAbort<Integer> SPORADIC_ABORT = new RandomAbort<>(new Random(), 0.0001f);
 
     static
@@ -316,7 +320,7 @@ public class LongBTreeTest
                 latch.await(1L, TimeUnit.SECONDS);
                 Assert.assertEquals(0, errors.get());
             }
-            System.out.println(String.format("%.0f%% complete %s", 100 * count.get() / (double) totalCount, errors.get() > 0 ? ("Errors: " + errors.get()) : ""));
+            log("%.1f%% complete %s", 100 * count.get() / (double) totalCount, errors.get() > 0 ? ("Errors: " + errors.get()) : "");
         }
     }
 
@@ -466,8 +470,10 @@ public class LongBTreeTest
 
     private static RandomTree randomTree(int minSize, int maxSize)
     {
-        return ThreadLocalRandom.current().nextBoolean() ? randomTreeByUpdate(minSize, maxSize)
-                                                         : randomTreeByBuilder(minSize, maxSize);
+        // perform most of our tree constructions via update, as this is more efficient; since every run uses this
+        // we test builder disproportionately more often than if it had its own test anyway
+        return ThreadLocalRandom.current().nextFloat() < 0.95 ? randomTreeByUpdate(minSize, maxSize)
+                                                              : randomTreeByBuilder(minSize, maxSize);
     }
 
     private static RandomTree randomTreeByUpdate(int minSize, int maxSize)
@@ -600,7 +606,7 @@ public class LongBTreeTest
         TreeSet<Integer> canon = new TreeSet<>();
         for (int i = 0 ; i < 10000000 ; i++)
             canon.add(i);
-        Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), null);
+        Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), UpdateFunction.noOp());
         btree = BTree.update(btree, naturalOrder(), canon, UpdateFunction.<Integer>noOp());
         canon.add(Integer.MIN_VALUE);
         canon.add(Integer.MAX_VALUE);
@@ -611,47 +617,61 @@ public class LongBTreeTest
     @Test
     public void testIndividualInsertsSmallOverlappingRange() throws ExecutionException, InterruptedException
     {
-        testInsertions(10000000, 50, 1, 1, true);
+        testInsertions(50, 1, 1, true);
     }
 
     @Test
     public void testBatchesSmallOverlappingRange() throws ExecutionException, InterruptedException
     {
-        testInsertions(10000000, 50, 1, 5, true);
+        testInsertions(50, 1, 5, true);
     }
 
     @Test
     public void testIndividualInsertsMediumSparseRange() throws ExecutionException, InterruptedException
     {
-        testInsertions(10000000, 500, 10, 1, true);
+        testInsertions(perThreadTrees / 10, 500, 10, 1, true);
     }
 
     @Test
     public void testBatchesMediumSparseRange() throws ExecutionException, InterruptedException
     {
-        testInsertions(10000000, 500, 10, 10, true);
+        testInsertions(500, 10, 10, true);
     }
 
     @Test
     public void testLargeBatchesLargeRange() throws ExecutionException, InterruptedException
     {
-        testInsertions(100000000, 5000, 3, 100, true);
+        testInsertions(perThreadTrees / 10, Math.max(maxTreeSize, 5000), 3, 100, true);
+    }
+
+    @Test
+    public void testRandomRangeAndBatches() throws ExecutionException, InterruptedException
+    {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int treeSize = random.nextInt(maxTreeSize / 10, maxTreeSize * 10);
+        for (int i = 0 ; i < perThreadTrees / 10 ; i++)
+            testInsertions(threads * 10, treeSize, random.nextInt(1, 100) / 10f, treeSize / 100, true);
     }
 
     @Test
     public void testSlicingSmallRandomTrees() throws ExecutionException, InterruptedException
     {
-        testInsertions(10000, 50, 10, 10, false);
+        testInsertions(50, 10, 10, false);
     }
 
-    private static void testInsertions(int totalCount, int perTestCount, int testKeyRatio, int modificationBatchSize, boolean quickEquality) throws ExecutionException, InterruptedException
+    private static void testInsertions(int perTestCount, float testKeyRatio, int modificationBatchSize, boolean quickEquality) throws ExecutionException, InterruptedException
+    {
+        int tests = perThreadTrees * threads;
+        testInsertions(tests, perTestCount, testKeyRatio, modificationBatchSize, quickEquality);
+    }
+
+    private static void testInsertions(int tests, int perTestCount, float testKeyRatio, int modificationBatchSize, boolean quickEquality) throws ExecutionException, InterruptedException
     {
         int batchesPerTest = perTestCount / modificationBatchSize;
-        int maximumRunLength = 100;
-        int testKeyRange = perTestCount * testKeyRatio;
-        int tests = totalCount / perTestCount;
-        System.out.println(String.format("Performing %d tests of %d operations, with %.2f max size/key-range ratio in batches of ~%d ops",
-                tests, perTestCount, 1 / (float) testKeyRatio, modificationBatchSize));
+        int testKeyRange = (int) (perTestCount * testKeyRatio);
+        long totalCount = (long) perTestCount * tests;
+        log("Performing %d tests of %d operations, with %.2f max size/key-range ratio in batches of ~%d ops",
+            tests, perTestCount, 1 / testKeyRatio, modificationBatchSize);
 
         // if we're not doing quick-equality, we can spam with garbage for all the checks we perform, so we'll split the work into smaller chunks
         int chunkSize = quickEquality ? tests : (int) (100000 / Math.pow(perTestCount, 2));
@@ -660,30 +680,33 @@ public class LongBTreeTest
             final List<ListenableFutureTask<List<ListenableFuture<?>>>> outer = new ArrayList<>();
             for (int i = 0 ; i < chunkSize ; i++)
             {
-                outer.add(doOneTestInsertions(testKeyRange, maximumRunLength, modificationBatchSize, batchesPerTest, quickEquality));
+                int maxRunLength = modificationBatchSize == 1 ? 1 : ThreadLocalRandom.current().nextInt(1, modificationBatchSize);
+                outer.add(doOneTestInsertions(testKeyRange, maxRunLength, modificationBatchSize, batchesPerTest, quickEquality));
             }
 
             final List<ListenableFuture<?>> inner = new ArrayList<>();
-            int complete = 0;
-            int reportInterval = totalCount / 100;
-            int lastReportAt = 0;
+            long complete = 0;
+            int reportInterval = Math.max(1000, (int) (totalCount / 10000));
+            long lastReportAt = 0;
             for (ListenableFutureTask<List<ListenableFuture<?>>> f : outer)
             {
                 inner.addAll(f.get());
                 complete += perTestCount;
                 if (complete - lastReportAt >= reportInterval)
                 {
-                    System.out.println(String.format("Completed %d of %d operations", (chunk * perTestCount) + complete, totalCount));
+                    long done = (chunk * perTestCount) + complete;
+                    float ratio = done / (float) totalCount;
+                    log("Completed %.1f%% (%d of %d operations)", ratio * 100, done, totalCount);
                     lastReportAt = complete;
                 }
             }
             Futures.allAsList(inner).get();
         }
         Snapshot snap = BTREE_TIMER.getSnapshot();
-        System.out.println(String.format("btree: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
+        log("btree: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile());
         snap = TREE_TIMER.getSnapshot();
-        System.out.println(String.format("java: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
-        System.out.println("Done");
+        log("java: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile());
+        log("Done");
     }
 
     private static ListenableFutureTask<List<ListenableFuture<?>>> doOneTestInsertions(final int upperBound, final int maxRunLength, final int averageModsPerIteration, final int iterations, final boolean quickEquality)
@@ -697,11 +720,11 @@ public class LongBTreeTest
                 NavigableMap<Integer, Integer> canon = new TreeMap<>();
                 Object[] btree = BTree.empty();
                 final TreeMap<Integer, Integer> buffer = new TreeMap<>();
-                final Random rnd = new Random();
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
                 for (int i = 0 ; i < iterations ; i++)
                 {
                     buffer.clear();
-                    int mods = (averageModsPerIteration >> 1) + 1 + rnd.nextInt(averageModsPerIteration);
+                    int mods = rnd.nextInt(1, averageModsPerIteration * 2);
                     while (mods > 0)
                     {
                         int v = rnd.nextInt(upperBound);
@@ -727,7 +750,7 @@ public class LongBTreeTest
 
                     if (!BTree.isWellFormed(btree, naturalOrder()))
                     {
-                        System.out.println("ERROR: Not well formed");
+                        log("ERROR: Not well formed");
                         throw new AssertionError("Not well formed!");
                     }
                     if (quickEquality)
@@ -754,7 +777,7 @@ public class LongBTreeTest
         for (int i = 0 ; i < 128 ; i++)
         {
             String id = String.format("[0..%d)", canon.size());
-            System.out.println("Testing " + id);
+            log("Testing " + id);
             Futures.allAsList(testAllSlices(id, cur, canon)).get();
             Object[] next = null;
             while (next == null)
@@ -819,7 +842,7 @@ public class LongBTreeTest
     {
         if (test != expect)
         {
-            System.out.println(String.format("%s: Expected %d, Got %d", id, expect, test));
+            log("%s: Expected %d, Got %d", id, expect, test);
         }
     }
 
@@ -832,18 +855,18 @@ public class LongBTreeTest
             Object j = canon.next();
             if (!Objects.equals(i, j))
             {
-                System.out.println(String.format("%s: Expected %d, Got %d", id, j, i));
+                log("%s: Expected %d, Got %d", id, j, i);
                 equal = false;
             }
         }
         while (btree.hasNext())
         {
-            System.out.println(String.format("%s: Expected <Nil>, Got %d", id, btree.next()));
+            log("%s: Expected <Nil>, Got %d", id, btree.next());
             equal = false;
         }
         while (canon.hasNext())
         {
-            System.out.println(String.format("%s: Expected %d, Got Nil", id, canon.next()));
+            log("%s: Expected %d, Got Nil", id, canon.next());
             equal = false;
         }
         if (!equal)
@@ -930,4 +953,58 @@ public class LongBTreeTest
             return v;
         }
     }
+
+    public static void main(String[] args) throws ExecutionException, InterruptedException, InvocationTargetException, IllegalAccessException
+    {
+        for (String arg : args)
+        {
+            if (arg.startsWith("fan="))
+                System.setProperty("cassandra.btree.fanfactor", arg.substring(4));
+            else if (arg.startsWith("min="))
+                minTreeSize = Integer.parseInt(arg.substring(4));
+            else if (arg.startsWith("max="))
+                maxTreeSize = Integer.parseInt(arg.substring(4));
+            else if (arg.startsWith("count="))
+                perThreadTrees = Integer.parseInt(arg.substring(6));
+            else
+                exit();
+        }
+
+        List<Method> methods = new ArrayList<>();
+        for (Method m : LongBTreeTest.class.getDeclaredMethods())
+        {
+            if (m.getParameters().length > 0)
+                continue;
+            for (Annotation annotation : m.getAnnotations())
+                if (annotation.annotationType() == Test.class)
+                    methods.add(m);
+        }
+
+        LongBTreeTest test = new LongBTreeTest();
+        Collections.sort(methods, (a, b) -> a.getName().compareTo(b.getName()));
+        log(Lists.transform(methods, (m) -> m.getName()).toString());
+        for (Method m : methods)
+        {
+            log(m.getName());
+            m.invoke(test);
+        }
+        log("success");
+    }
+
+    private static void exit()
+    {
+        log("usage: fan=<int> min=<int> max=<int> count=<int>");
+        log("fan:   btree fanout");
+        log("min:   minimum btree size (must be >= 4)");
+        log("max:   maximum btree size (must be >= 4)");
+        log("count: number of trees to assign each core, for each test");
+    }
+
+    private static void log(String formatstr, Object ... args)
+    {
+        args = Arrays.copyOf(args, args.length + 1);
+        System.arraycopy(args, 0, args, 1, args.length - 1);
+        args[0] = System.currentTimeMillis();
+        System.out.printf("%tT: " + formatstr + "\n", args);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 358168f..e8451e0 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -297,16 +297,16 @@ public class Util
         }
     }
 
-    public static List<ArrayBackedPartition> getAllUnfiltered(ReadCommand command)
+    public static List<ImmutableBTreePartition> getAllUnfiltered(ReadCommand command)
     {
-        List<ArrayBackedPartition> results = new ArrayList<>();
+        List<ImmutableBTreePartition> results = new ArrayList<>();
         try (ReadOrderGroup orderGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(orderGroup))
         {
             while (iterator.hasNext())
             {
                 try (UnfilteredRowIterator partition = iterator.next())
                 {
-                    results.add(ArrayBackedPartition.create(partition));
+                    results.add(ImmutableBTreePartition.create(partition));
                 }
             }
         }
@@ -362,7 +362,7 @@ public class Util
         }
     }
 
-    public static ArrayBackedPartition getOnlyPartitionUnfiltered(ReadCommand cmd)
+    public static ImmutableBTreePartition getOnlyPartitionUnfiltered(ReadCommand cmd)
     {
         try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iterator = cmd.executeLocally(orderGroup))
         {
@@ -370,7 +370,7 @@ public class Util
             try (UnfilteredRowIterator partition = iterator.next())
             {
                 assert !iterator.hasNext() : "Expecting a single partition but got more";
-                return ArrayBackedPartition.create(partition);
+                return ImmutableBTreePartition.create(partition);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index 0448a16..5030029 100644
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@ -66,16 +66,16 @@ public class CacheProviderTest
                                     cfm);
     }
 
-    private ArrayBackedCachedPartition createPartition()
+    private CachedBTreePartition createPartition()
     {
         PartitionUpdate update = new RowUpdateBuilder(cfm, System.currentTimeMillis(), "key1")
                                  .add("col1", "val1")
                                  .buildUpdate();
 
-        return ArrayBackedCachedPartition.create(update.unfilteredIterator(), FBUtilities.nowInSeconds());
+        return CachedBTreePartition.create(update.unfilteredIterator(), FBUtilities.nowInSeconds());
     }
 
-    private void simpleCase(ArrayBackedCachedPartition partition, ICache<MeasureableString, IRowCacheEntry> cache)
+    private void simpleCase(CachedBTreePartition partition, ICache<MeasureableString, IRowCacheEntry> cache)
     {
         cache.put(key1, partition);
         assertNotNull(cache.get(key1));
@@ -89,15 +89,15 @@ public class CacheProviderTest
         assertEquals(CAPACITY, cache.size());
     }
 
-    private void assertDigests(IRowCacheEntry one, ArrayBackedCachedPartition two)
+    private void assertDigests(IRowCacheEntry one, CachedBTreePartition two)
     {
-        assertTrue(one instanceof ArrayBackedCachedPartition);
+        assertTrue(one instanceof CachedBTreePartition);
         try
         {
             MessageDigest d1 = MessageDigest.getInstance("MD5");
             MessageDigest d2 = MessageDigest.getInstance("MD5");
-            UnfilteredRowIterators.digest(((ArrayBackedCachedPartition) one).unfilteredIterator(), d1);
-            UnfilteredRowIterators.digest(((ArrayBackedCachedPartition) two).unfilteredIterator(), d2);
+            UnfilteredRowIterators.digest(((CachedBTreePartition) one).unfilteredIterator(), d1);
+            UnfilteredRowIterators.digest(((CachedBTreePartition) two).unfilteredIterator(), d2);
             assertTrue(MessageDigest.isEqual(d1.digest(), d2.digest()));
         }
         catch (NoSuchAlgorithmException e)
@@ -106,7 +106,7 @@ public class CacheProviderTest
         }
     }
 
-    private void concurrentCase(final ArrayBackedCachedPartition partition, final ICache<MeasureableString, IRowCacheEntry> cache) throws InterruptedException
+    private void concurrentCase(final CachedBTreePartition partition, final ICache<MeasureableString, IRowCacheEntry> cache) throws InterruptedException
     {
         final long startTime = System.currentTimeMillis() + 500;
         Runnable runnable = new Runnable()
@@ -140,7 +140,7 @@ public class CacheProviderTest
     public void testSerializingCache() throws InterruptedException
     {
         ICache<MeasureableString, IRowCacheEntry> cache = SerializingCache.create(CAPACITY, Weighers.<RefCountedMemory>singleton(), new SerializingCacheProvider.RowCacheSerializer());
-        ArrayBackedCachedPartition partition = createPartition();
+        CachedBTreePartition partition = createPartition();
         simpleCase(partition, cache);
         concurrentCase(partition, cache);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 918e10d..721963d 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -754,7 +754,7 @@ public abstract class CQLTester
                 if (!Objects.equal(expectedByteValue, actualValue))
                 {
                     Object actualValueDecoded = actualValue == null ? null : column.type.getSerializer().deserialize(actualValue);
-                    if (!expected[j].equals(actualValueDecoded))
+                    if (!Objects.equal(expected[j], actualValueDecoded))
                         Assert.fail(String.format("Invalid value for row %d column %d (%s of type %s), expected <%s> but got <%s>",
                                                   i,
                                                   j,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
index fbb7a5b..8c79689 100644
--- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
@@ -20,9 +20,19 @@ package org.apache.cassandra.db;
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
@@ -35,24 +45,17 @@ import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.Util.PartitionerSwitcher;
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.db.partitions.ArrayBackedPartition;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
 public class BatchlogManagerTest
@@ -108,7 +111,7 @@ public class BatchlogManagerTest
                 .applyUnsafe();
 
         DecoratedKey dk = cfs.decorateKey(ByteBufferUtil.bytes("1234"));
-        ArrayBackedPartition results = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, dk).build());
+        ImmutableBTreePartition results = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, dk).build());
         Iterator<Row> iter = results.iterator();
         assert iter.hasNext();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/test/unit/org/apache/cassandra/db/DeletePartitionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DeletePartitionTest.java b/test/unit/org/apache/cassandra/db/DeletePartitionTest.java
index 6ab9a90..a65befd 100644
--- a/test/unit/org/apache/cassandra/db/DeletePartitionTest.java
+++ b/test/unit/org/apache/cassandra/db/DeletePartitionTest.java
@@ -86,7 +86,7 @@ public class DeletePartitionTest
             store.forceBlockingFlush();
 
         // validate removal
-        ArrayBackedPartition partitionUnfiltered = Util.getOnlyPartitionUnfiltered(Util.cmd(store, key).build());
+        ImmutableBTreePartition partitionUnfiltered = Util.getOnlyPartitionUnfiltered(Util.cmd(store, key).build());
         assertFalse(partitionUnfiltered.partitionLevelDeletion().isLive());
         assertFalse(partitionUnfiltered.iterator().hasNext());
     }