You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/08/12 23:53:41 UTC
[2/9] cassandra git commit: Remove ArrayBackedPartition and hierarchy
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
new file mode 100644
index 0000000..7e50716
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.Predicate;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.BTreeSearchIterator;
+import org.apache.cassandra.utils.btree.UpdateFunction;
+
+/**
+ * Immutable implementation of a Row object.
+ */
+public class BTreeRow extends AbstractRow
+{
+ private static final long EMPTY_SIZE = ObjectSizes.measure(emptyRow(Clustering.EMPTY));
+
+ private final Clustering clustering;
+ private final Columns columns;
+ private final LivenessInfo primaryKeyLivenessInfo;
+ private final DeletionTime deletion;
+
+ // The data for each columns present in this row in column sorted order.
+ private final Object[] btree;
+
+ // We need to filter the tombstones of a row on every read (twice in fact: first to remove purgeable tombstone, and then after reconciliation to remove
+ // all tombstone since we don't return them to the client) as well as on compaction. But it's likely that many rows won't have any tombstone at all, so
+ // we want to speed up that case by not having to iterate/copy the row in this case. We could keep a single boolean telling us if we have tombstones,
+ // but that doesn't work for expiring columns. So instead we keep the deletion time for the first thing in the row to be deleted. This allow at any given
+ // time to know if we have any deleted information or not. If we any "true" tombstone (i.e. not an expiring cell), this value will be forced to
+ // Integer.MIN_VALUE, but if we don't and have expiring cells, this will the time at which the first expiring cell expires. If we have no tombstones and
+ // no expiring cells, this will be Integer.MAX_VALUE;
+ private final int minLocalDeletionTime;
+
+ private BTreeRow(Clustering clustering, Columns columns, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, Object[] btree, int minLocalDeletionTime)
+ {
+ this.clustering = clustering;
+ this.columns = columns;
+ this.primaryKeyLivenessInfo = primaryKeyLivenessInfo;
+ this.deletion = deletion;
+ this.btree = btree;
+ this.minLocalDeletionTime = minLocalDeletionTime;
+ }
+
+ // Note that it's often easier/safer to use the sortedBuilder/unsortedBuilder or one of the static creation method below. Only directly useful in a small amount of cases.
+ public static BTreeRow create(Clustering clustering, Columns columns, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, Object[] btree)
+ {
+ int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion));
+ if (minDeletionTime != Integer.MIN_VALUE)
+ {
+ for (ColumnData cd : BTree.<ColumnData>iterable(btree))
+ minDeletionTime = Math.min(minDeletionTime, minDeletionTime(cd));
+ }
+
+ return new BTreeRow(clustering, columns, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
+ }
+
+ public static BTreeRow emptyRow(Clustering clustering)
+ {
+ return new BTreeRow(clustering, Columns.NONE, LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.empty(), Integer.MAX_VALUE);
+ }
+
+ public static BTreeRow singleCellRow(Clustering clustering, Cell cell)
+ {
+ if (cell.column().isSimple())
+ return new BTreeRow(clustering, Columns.of(cell.column()), LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.singleton(cell), minDeletionTime(cell));
+
+ ComplexColumnData complexData = new ComplexColumnData(cell.column(), new Cell[]{ cell }, DeletionTime.LIVE);
+ return new BTreeRow(clustering, Columns.of(cell.column()), LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.singleton(complexData), minDeletionTime(cell));
+ }
+
+ public static BTreeRow emptyDeletedRow(Clustering clustering, DeletionTime deletion)
+ {
+ assert !deletion.isLive();
+ return new BTreeRow(clustering, Columns.NONE, LivenessInfo.EMPTY, deletion, BTree.empty(), Integer.MIN_VALUE);
+ }
+
+ public static BTreeRow noCellLiveRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo)
+ {
+ assert !primaryKeyLivenessInfo.isEmpty();
+ return new BTreeRow(clustering, Columns.NONE, primaryKeyLivenessInfo, DeletionTime.LIVE, BTree.empty(), minDeletionTime(primaryKeyLivenessInfo));
+ }
+
+ private static int minDeletionTime(Cell cell)
+ {
+ return cell.isTombstone() ? Integer.MIN_VALUE : cell.localDeletionTime();
+ }
+
+ private static int minDeletionTime(LivenessInfo info)
+ {
+ return info.isExpiring() ? info.localExpirationTime() : Integer.MAX_VALUE;
+ }
+
+ private static int minDeletionTime(DeletionTime dt)
+ {
+ return dt.isLive() ? Integer.MAX_VALUE : Integer.MIN_VALUE;
+ }
+
+ private static int minDeletionTime(ComplexColumnData cd)
+ {
+ int min = minDeletionTime(cd.complexDeletion());
+ for (Cell cell : cd)
+ {
+ min = Math.min(min, minDeletionTime(cell));
+ if (min == Integer.MIN_VALUE)
+ break;
+ }
+ return min;
+ }
+
+ private static int minDeletionTime(ColumnData cd)
+ {
+ return cd.column().isSimple() ? minDeletionTime((Cell) cd) : minDeletionTime((ComplexColumnData)cd);
+ }
+
+ private static int minDeletionTime(Object[] btree, LivenessInfo info, DeletionTime rowDeletion)
+ {
+ int min = Math.min(minDeletionTime(info), minDeletionTime(rowDeletion));
+ for (ColumnData cd : BTree.<ColumnData>iterable(btree))
+ {
+ min = Math.min(min, minDeletionTime(cd));
+ if (min == Integer.MIN_VALUE)
+ break;
+ }
+ return min;
+ }
+
+ public Clustering clustering()
+ {
+ return clustering;
+ }
+
+ public Columns columns()
+ {
+ return columns;
+ }
+
+ public LivenessInfo primaryKeyLivenessInfo()
+ {
+ return primaryKeyLivenessInfo;
+ }
+
+ public boolean isEmpty()
+ {
+ return primaryKeyLivenessInfo().isEmpty()
+ && deletion().isLive()
+ && BTree.isEmpty(btree);
+ }
+
+ public DeletionTime deletion()
+ {
+ return deletion;
+ }
+
+ public Cell getCell(ColumnDefinition c)
+ {
+ assert !c.isComplex();
+ return (Cell) BTree.<Object>find(btree, ColumnDefinition.asymmetricColumnDataComparator, c);
+ }
+
+ public Cell getCell(ColumnDefinition c, CellPath path)
+ {
+ assert c.isComplex();
+ ComplexColumnData cd = getComplexColumnData(c);
+ if (cd == null)
+ return null;
+ return cd.getCell(path);
+ }
+
+ public ComplexColumnData getComplexColumnData(ColumnDefinition c)
+ {
+ assert c.isComplex();
+ return (ComplexColumnData) BTree.<Object>find(btree, ColumnDefinition.asymmetricColumnDataComparator, c);
+ }
+
+ public Iterator<ColumnData> iterator()
+ {
+ return searchIterator();
+ }
+
+ public Iterable<Cell> cells()
+ {
+ return CellIterator::new;
+ }
+
+ public BTreeSearchIterator<ColumnDefinition, ColumnData> searchIterator()
+ {
+ return BTree.slice(btree, ColumnDefinition.asymmetricColumnDataComparator, BTree.Dir.ASC);
+ }
+
+ public Row filter(ColumnFilter filter, CFMetaData metadata)
+ {
+ return filter(filter, DeletionTime.LIVE, false, metadata);
+ }
+
+ public Row filter(ColumnFilter filter, DeletionTime activeDeletion, boolean setActiveDeletionToRow, CFMetaData metadata)
+ {
+ Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = metadata.getDroppedColumns();
+
+ if (filter.includesAllColumns() && (activeDeletion.isLive() || deletion.supersedes(activeDeletion)) && droppedColumns.isEmpty())
+ return this;
+
+ boolean mayHaveShadowed = activeDeletion.supersedes(deletion);
+
+ LivenessInfo newInfo = primaryKeyLivenessInfo;
+ DeletionTime newDeletion = deletion;
+ if (mayHaveShadowed)
+ {
+ if (activeDeletion.deletes(newInfo.timestamp()))
+ newInfo = LivenessInfo.EMPTY;
+ // note that mayHaveShadowed means the activeDeletion shadows the row deletion. So if don't have setActiveDeletionToRow,
+ // the row deletion is shadowed and we shouldn't return it.
+ newDeletion = setActiveDeletionToRow ? activeDeletion : DeletionTime.LIVE;
+ }
+
+ Columns columns = filter.fetchedColumns().columns(isStatic());
+ Predicate<ColumnDefinition> inclusionTester = columns.inOrderInclusionTester();
+ return transformAndFilter(newInfo, newDeletion, (cd) -> {
+
+ ColumnDefinition column = cd.column();
+ if (!inclusionTester.test(column))
+ return null;
+
+ CFMetaData.DroppedColumn dropped = droppedColumns.get(column.name.bytes);
+ if (column.isComplex())
+ return ((ComplexColumnData)cd).filter(filter, mayHaveShadowed ? activeDeletion : DeletionTime.LIVE, dropped);
+
+ Cell cell = (Cell)cd;
+ return (dropped == null || cell.timestamp() > dropped.droppedTime) && !(mayHaveShadowed && activeDeletion.deletes(cell))
+ ? cell : null;
+ });
+ }
+
+ public boolean hasComplexDeletion()
+ {
+ // We start by the end cause we know complex columns sort before simple ones
+ for (ColumnData cd : BTree.<ColumnData>iterable(btree, BTree.Dir.DESC))
+ {
+ if (cd.column().isSimple())
+ return false;
+
+ if (!((ComplexColumnData)cd).complexDeletion().isLive())
+ return true;
+ }
+ return false;
+ }
+
+ public Row markCounterLocalToBeCleared()
+ {
+ return transformAndFilter(primaryKeyLivenessInfo, deletion, (cd) -> cd.column().cellValueType().isCounter()
+ ? cd.markCounterLocalToBeCleared()
+ : cd);
+ }
+
+ public boolean hasDeletion(int nowInSec)
+ {
+ return nowInSec >= minLocalDeletionTime;
+ }
+
+ /**
+ * Returns a copy of the row where all timestamps for live data have replaced by {@code newTimestamp} and
+ * all deletion timestamp by {@code newTimestamp - 1}.
+ *
+ * This exists for the Paxos path, see {@link PartitionUpdate#updateAllTimestamp} for additional details.
+ */
+ public Row updateAllTimestamp(long newTimestamp)
+ {
+ LivenessInfo newInfo = primaryKeyLivenessInfo.isEmpty() ? primaryKeyLivenessInfo : primaryKeyLivenessInfo.withUpdatedTimestamp(newTimestamp);
+ DeletionTime newDeletion = deletion.isLive() ? deletion : new DeletionTime(newTimestamp - 1, deletion.localDeletionTime());
+
+ return transformAndFilter(newInfo, newDeletion, (cd) -> cd.updateAllTimestamp(newTimestamp));
+ }
+
+ public Row purge(DeletionPurger purger, int nowInSec)
+ {
+ if (!hasDeletion(nowInSec))
+ return this;
+
+ LivenessInfo newInfo = purger.shouldPurge(primaryKeyLivenessInfo, nowInSec) ? LivenessInfo.EMPTY : primaryKeyLivenessInfo;
+ DeletionTime newDeletion = purger.shouldPurge(deletion) ? DeletionTime.LIVE : deletion;
+
+ return transformAndFilter(newInfo, newDeletion, (cd) -> cd.purge(purger, nowInSec));
+ }
+
+ private Row transformAndFilter(LivenessInfo info, DeletionTime deletion, Function<ColumnData, ColumnData> function)
+ {
+ Object[] transformed = BTree.transformAndFilter(btree, function);
+
+ if (btree == transformed && info == this.primaryKeyLivenessInfo && deletion == this.deletion)
+ return this;
+
+ if (info.isEmpty() && deletion.isLive() && BTree.isEmpty(transformed))
+ return null;
+
+ int minDeletionTime = minDeletionTime(transformed, info, deletion);
+ return new BTreeRow(clustering, columns, info, deletion, transformed, minDeletionTime);
+ }
+
+ public int dataSize()
+ {
+ int dataSize = clustering.dataSize()
+ + primaryKeyLivenessInfo.dataSize()
+ + deletion.dataSize();
+
+ for (ColumnData cd : this)
+ dataSize += cd.dataSize();
+ return dataSize;
+ }
+
+ public long unsharedHeapSizeExcludingData()
+ {
+ long heapSize = EMPTY_SIZE
+ + clustering.unsharedHeapSizeExcludingData()
+ + BTree.sizeOfStructureOnHeap(btree);
+
+ for (ColumnData cd : this)
+ heapSize += cd.unsharedHeapSizeExcludingData();
+ return heapSize;
+ }
+
+ public static Row.Builder sortedBuilder(Columns columns)
+ {
+ return new Builder(columns, true);
+ }
+
+ public static Row.Builder unsortedBuilder(Columns columns, int nowInSec)
+ {
+ return new Builder(columns, false, nowInSec);
+ }
+
+ // This is only used by PartitionUpdate.CounterMark but other uses should be avoided as much as possible as it breaks our general
+ // assumption that Row objects are immutable. This method should go away post-#6506 in particular.
+ // This method is in particular not exposed by the Row API on purpose.
+ // This method also *assumes* that the cell we're setting already exists.
+ public void setValue(ColumnDefinition column, CellPath path, ByteBuffer value)
+ {
+ ColumnData current = (ColumnData) BTree.<Object>find(btree, ColumnDefinition.asymmetricColumnDataComparator, column);
+ if (column.isSimple())
+ BTree.replaceInSitu(btree, ColumnData.comparator, current, ((Cell) current).withUpdatedValue(value));
+ else
+ ((ComplexColumnData) current).setValue(path, value);
+ }
+
+ public Iterable<Cell> cellsInLegacyOrder(CFMetaData metadata)
+ {
+ return () -> new CellInLegacyOrderIterator(metadata);
+ }
+
+ private class CellIterator extends AbstractIterator<Cell>
+ {
+ private Iterator<ColumnData> columnData = iterator();
+ private Iterator<Cell> complexCells;
+
+ protected Cell computeNext()
+ {
+ while (true)
+ {
+ if (complexCells != null)
+ {
+ if (complexCells.hasNext())
+ return complexCells.next();
+
+ complexCells = null;
+ }
+
+ if (!columnData.hasNext())
+ return endOfData();
+
+ ColumnData cd = columnData.next();
+ if (cd.column().isComplex())
+ complexCells = ((ComplexColumnData)cd).iterator();
+ else
+ return (Cell)cd;
+ }
+ }
+ }
+
+ private class CellInLegacyOrderIterator extends AbstractIterator<Cell>
+ {
+ private final AbstractType<?> comparator;
+ private final int firstComplexIdx;
+ private int simpleIdx;
+ private int complexIdx;
+ private Iterator<Cell> complexCells;
+ private final Object[] data;
+
+ private CellInLegacyOrderIterator(CFMetaData metadata)
+ {
+ this.comparator = metadata.getColumnDefinitionNameComparator(isStatic() ? ColumnDefinition.Kind.STATIC : ColumnDefinition.Kind.REGULAR);
+
+ // copy btree into array for simple separate iteration of simple and complex columns
+ this.data = new Object[BTree.size(btree)];
+ BTree.toArray(btree, data, 0);
+
+ int idx = Iterators.indexOf(Iterators.forArray(data), cd -> cd instanceof ComplexColumnData);
+ this.firstComplexIdx = idx < 0 ? data.length : idx;
+ this.complexIdx = firstComplexIdx;
+ }
+
+ protected Cell computeNext()
+ {
+ while (true)
+ {
+ if (complexCells != null)
+ {
+ if (complexCells.hasNext())
+ return complexCells.next();
+
+ complexCells = null;
+ }
+
+ if (simpleIdx >= firstComplexIdx)
+ {
+ if (complexIdx >= data.length)
+ return endOfData();
+
+ complexCells = ((ComplexColumnData)data[complexIdx++]).iterator();
+ }
+ else
+ {
+ if (complexIdx >= data.length)
+ return (Cell)data[simpleIdx++];
+
+ if (comparator.compare(((ColumnData) data[simpleIdx]).column().name.bytes, ((ColumnData) data[complexIdx]).column().name.bytes) < 0)
+ return (Cell)data[simpleIdx++];
+ else
+ complexCells = ((ComplexColumnData)data[complexIdx++]).iterator();
+ }
+ }
+ }
+ }
+
+ public static class Builder implements Row.Builder
+ {
+ // a simple marker class that will sort to the beginning of a run of complex cells to store the deletion time
+ private static class ComplexColumnDeletion extends BufferCell
+ {
+ public ComplexColumnDeletion(ColumnDefinition column, DeletionTime deletionTime)
+ {
+ super(column, deletionTime.markedForDeleteAt(), 0, deletionTime.localDeletionTime(), ByteBufferUtil.EMPTY_BYTE_BUFFER, CellPath.BOTTOM);
+ }
+ }
+
+ // converts a run of Cell with equal column into a ColumnData
+ private static class CellResolver implements BTree.Builder.Resolver
+ {
+ final int nowInSec;
+ private CellResolver(int nowInSec)
+ {
+ this.nowInSec = nowInSec;
+ }
+
+ public ColumnData resolve(Object[] cells, int lb, int ub)
+ {
+ Cell cell = (Cell) cells[lb];
+ ColumnDefinition column = cell.column;
+ if (cell.column.isSimple())
+ {
+ assert lb + 1 == ub || nowInSec != Integer.MIN_VALUE;
+ while (++lb < ub)
+ cell = Cells.reconcile(cell, (Cell) cells[lb], nowInSec);
+ return cell;
+ }
+
+ // TODO: relax this in the case our outer provider is sorted (want to delay until remaining changes are
+ // bedded in, as less important; galloping makes it pretty cheap anyway)
+ Arrays.sort(cells, lb, ub, (Comparator<Object>) column.cellComparator());
+ cell = (Cell) cells[lb];
+ DeletionTime deletion = DeletionTime.LIVE;
+ if (cell instanceof ComplexColumnDeletion)
+ {
+ // TODO: do we need to be robust to multiple of these being provided?
+ deletion = new DeletionTime(cell.timestamp(), cell.localDeletionTime());
+ lb++;
+ }
+
+ List<Object> buildFrom = Arrays.asList(cells).subList(lb, ub);
+ Object[] btree = BTree.build(buildFrom, UpdateFunction.noOp());
+ return new ComplexColumnData(column, btree, deletion);
+ }
+
+ };
+ protected final Columns columns;
+
+ protected Clustering clustering;
+ protected LivenessInfo primaryKeyLivenessInfo = LivenessInfo.EMPTY;
+ protected DeletionTime deletion = DeletionTime.LIVE;
+
+ private final boolean isSorted;
+ private final BTree.Builder<Cell> cells;
+ private final CellResolver resolver;
+ private boolean hasComplex = false;
+
+ // For complex column at index i of 'columns', we store at complexDeletions[i] its complex deletion.
+
+ protected Builder(Columns columns, boolean isSorted)
+ {
+ this(columns, isSorted, Integer.MIN_VALUE);
+ }
+
+ protected Builder(Columns columns, boolean isSorted, int nowInSecs)
+ {
+ this.columns = columns;
+ this.cells = BTree.builder(ColumnData.comparator);
+ resolver = new CellResolver(nowInSecs);
+ this.isSorted = isSorted;
+ this.cells.auto(false);
+ }
+
+ public boolean isSorted()
+ {
+ return isSorted;
+ }
+
+ public void newRow(Clustering clustering)
+ {
+ assert this.clustering == null; // Ensures we've properly called build() if we've use this builder before
+ this.clustering = clustering;
+ }
+
+ public Clustering clustering()
+ {
+ return clustering;
+ }
+
+ protected void reset()
+ {
+ this.clustering = null;
+ this.primaryKeyLivenessInfo = LivenessInfo.EMPTY;
+ this.deletion = DeletionTime.LIVE;
+ this.cells.reuse();
+ }
+
+ public void addPrimaryKeyLivenessInfo(LivenessInfo info)
+ {
+ this.primaryKeyLivenessInfo = info;
+ }
+
+ public void addRowDeletion(DeletionTime deletion)
+ {
+ this.deletion = deletion;
+ }
+
+ public void addCell(Cell cell)
+ {
+ assert cell.column().isStatic() == (clustering == Clustering.STATIC_CLUSTERING) : "Column is " + cell.column() + ", clustering = " + clustering;
+ cells.add(cell);
+ hasComplex |= cell.column.isComplex();
+ }
+
+ public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion)
+ {
+ cells.add(new ComplexColumnDeletion(column, complexDeletion));
+ hasComplex = true;
+ }
+
+ public Row build()
+ {
+ if (!isSorted)
+ cells.sort();
+ // we can avoid resolving if we're sorted and have no complex values
+ // (because we'll only have unique simple cells, which are already in their final condition)
+ if (!isSorted | hasComplex)
+ cells.resolve(resolver);
+ Object[] btree = cells.build();
+ int minDeletionTime = minDeletionTime(btree, primaryKeyLivenessInfo, deletion);
+ Row row = new BTreeRow(clustering, columns, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
+ reset();
+ return row;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/EncodingStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/EncodingStats.java b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
index ca62c47..efa40ad 100644
--- a/src/java/org/apache/cassandra/db/rows/EncodingStats.java
+++ b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
@@ -240,6 +240,17 @@ public class EncodingStats
isTTLSet ? minTTL : TTL_EPOCH,
isColumnSetPerRowSet ? (rows == 0 ? 0 : (int)(totalColumnsSet / rows)) : -1);
}
+
+ public static EncodingStats collect(Row staticRow, Iterator<Row> rows, DeletionInfo deletionInfo)
+ {
+ Collector collector = new Collector();
+ deletionInfo.collectStats(collector);
+ if (!staticRow.isEmpty())
+ Rows.collectStats(staticRow, collector);
+ while (rows.hasNext())
+ Rows.collectStats(rows.next(), collector);
+ return collector.get();
+ }
}
public static class Serializer
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
index 33ad447..996e89a 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -395,7 +395,7 @@ public interface Row extends Unfiltered, Iterable<ColumnData>
// Because some data might have been shadowed by the 'activeDeletion', we could have an empty row
return rowInfo.isEmpty() && rowDeletion.isLive() && dataBuffer.isEmpty()
? null
- : BTreeBackedRow.create(clustering, columns, rowInfo, rowDeletion, BTree.build(dataBuffer, UpdateFunction.<ColumnData>noOp()));
+ : BTreeRow.create(clustering, columns, rowInfo, rowDeletion, BTree.build(dataBuffer, UpdateFunction.<ColumnData>noOp()));
}
public Clustering mergedClustering()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/RowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowIterators.java b/src/java/org/apache/cassandra/db/rows/RowIterators.java
index 766cf19..30f5c50 100644
--- a/src/java/org/apache/cassandra/db/rows/RowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/RowIterators.java
@@ -49,7 +49,12 @@ public abstract class RowIterators
iterator.next().digest(digest);
}
- public static RowIterator emptyIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final boolean isReverseOrder)
+ public static RowIterator emptyIterator(CFMetaData cfm, DecoratedKey partitionKey, boolean isReverseOrder)
+ {
+ return iterator(cfm, partitionKey, isReverseOrder, Collections.emptyIterator());
+ }
+
+ public static RowIterator iterator(CFMetaData cfm, DecoratedKey partitionKey, boolean isReverseOrder, Iterator<Row> iterator)
{
return new RowIterator()
{
@@ -78,23 +83,16 @@ public abstract class RowIterators
return Rows.EMPTY_STATIC_ROW;
}
+ public void close() { }
+
public boolean hasNext()
{
- return false;
+ return iterator.hasNext();
}
public Row next()
{
- throw new NoSuchElementException();
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
-
- public void close()
- {
+ return iterator.next();
}
};
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/Rows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java
index bacd591..0b739a8 100644
--- a/src/java/org/apache/cassandra/db/rows/Rows.java
+++ b/src/java/org/apache/cassandra/db/rows/Rows.java
@@ -49,7 +49,7 @@ public abstract class Rows
private Rows() {}
- public static final Row EMPTY_STATIC_ROW = BTreeBackedRow.emptyRow(Clustering.STATIC_CLUSTERING);
+ public static final Row EMPTY_STATIC_ROW = BTreeRow.emptyRow(Clustering.STATIC_CLUSTERING);
public static Row.Builder copy(Row row, Row.Builder builder)
{
@@ -217,7 +217,7 @@ public abstract class Rows
public static Row merge(Row row1, Row row2, int nowInSec)
{
Columns mergedColumns = row1.columns().mergeTo(row2.columns());
- Row.Builder builder = BTreeBackedRow.sortedBuilder(mergedColumns);
+ Row.Builder builder = BTreeRow.sortedBuilder(mergedColumns);
merge(row1, row2, mergedColumns, builder, nowInSec, SecondaryIndexManager.nullUpdater);
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index f17ccca..2102534 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -204,7 +204,7 @@ public class UnfilteredRowIteratorSerializer
final SerializationHeader sHeader = header.sHeader;
return new AbstractUnfilteredRowIterator(metadata, header.key, header.partitionDeletion, sHeader.columns(), header.staticRow, header.isReversed, sHeader.stats())
{
- private final Row.Builder builder = BTreeBackedRow.sortedBuilder(sHeader.columns().regulars);
+ private final Row.Builder builder = BTreeRow.sortedBuilder(sHeader.columns().regulars);
protected Unfiltered computeNext()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 650a18d..60f0dcb 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -89,10 +89,14 @@ public abstract class UnfilteredRowIterators
return UnfilteredRowMergeIterator.create(iterators, nowInSec, mergeListener);
}
+ public static UnfilteredRowIterator emptyIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final boolean isReverseOrder)
+ {
+ return noRowsIterator(cfm, partitionKey, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, isReverseOrder);
+ }
/**
* Returns an empty atom iterator for a given partition.
*/
- public static UnfilteredRowIterator emptyIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final boolean isReverseOrder)
+ public static UnfilteredRowIterator noRowsIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final Row staticRow, final DeletionTime partitionDeletion, final boolean isReverseOrder)
{
return new UnfilteredRowIterator()
{
@@ -118,12 +122,12 @@ public abstract class UnfilteredRowIterators
public DeletionTime partitionLevelDeletion()
{
- return DeletionTime.LIVE;
+ return partitionDeletion;
}
public Row staticRow()
{
- return Rows.EMPTY_STATIC_ROW;
+ return staticRow;
}
public EncodingStats stats()
@@ -225,7 +229,7 @@ public abstract class UnfilteredRowIterators
@Override
protected Row computeNextStatic(Row row)
{
- Row.Builder staticBuilder = allocator.cloningArrayBackedRowBuilder(columns().statics);
+ Row.Builder staticBuilder = allocator.cloningBTreeRowBuilder(columns().statics);
return Rows.copy(row, staticBuilder).build();
}
@@ -233,7 +237,7 @@ public abstract class UnfilteredRowIterators
protected Row computeNext(Row row)
{
if (regularBuilder == null)
- regularBuilder = allocator.cloningArrayBackedRowBuilder(columns().regulars);
+ regularBuilder = allocator.cloningBTreeRowBuilder(columns().regulars);
return Rows.copy(row, regularBuilder).build();
}
@@ -541,7 +545,7 @@ public abstract class UnfilteredRowIterators
{
Row merged = rowMerger.merge(markerMerger.activeDeletion());
if (listener != null)
- listener.onMergedRows(merged == null ? BTreeBackedRow.emptyRow(rowMerger.mergedClustering()) : merged, columns().regulars, rowMerger.mergedRows());
+ listener.onMergedRows(merged == null ? BTreeRow.emptyRow(rowMerger.mergedClustering()) : merged, columns().regulars, rowMerger.mergedRows());
return merged;
}
else
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index f306e6d..14b06cf 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -341,7 +341,7 @@ public class UnfilteredSerializer
{
int flags = in.readUnsignedByte();
assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW && isStatic(flags) : flags;
- Row.Builder builder = BTreeBackedRow.sortedBuilder(helper.fetchedStaticColumns(header));
+ Row.Builder builder = BTreeRow.sortedBuilder(helper.fetchedStaticColumns(header));
builder.newRow(Clustering.STATIC_CLUSTERING);
return deserializeRowBody(in, header, helper, flags, builder);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/view/MaterializedView.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedView.java b/src/java/org/apache/cassandra/db/view/MaterializedView.java
index 39b2769..7337e4b 100644
--- a/src/java/org/apache/cassandra/db/view/MaterializedView.java
+++ b/src/java/org/apache/cassandra/db/view/MaterializedView.java
@@ -52,10 +52,10 @@ import org.apache.cassandra.db.ReadOrderGroup;
import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.Slice;
import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.partitions.AbstractThreadUnsafePartition;
+import org.apache.cassandra.db.partitions.AbstractBTreePartition;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.rows.BTreeBackedRow;
+import org.apache.cassandra.db.rows.BTreeRow;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.ColumnData;
import org.apache.cassandra.db.rows.ComplexColumnData;
@@ -208,7 +208,7 @@ public class MaterializedView
*
* @return true if {@param partition} modifies a column included in the view
*/
- public boolean updateAffectsView(AbstractThreadUnsafePartition partition)
+ public boolean updateAffectsView(AbstractBTreePartition partition)
{
// If we are including all of the columns, then any update will be included
if (includeAll)
@@ -268,7 +268,7 @@ public class MaterializedView
int nowInSec)
{
CFMetaData viewCfm = getViewCfs().metadata;
- Row.Builder builder = BTreeBackedRow.unsortedBuilder(viewCfm.partitionColumns().regulars, nowInSec);
+ Row.Builder builder = BTreeRow.unsortedBuilder(viewCfm.partitionColumns().regulars, nowInSec);
builder.newRow(viewClustering(temporalRow, resolver));
builder.addRowDeletion(deletionTime);
return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build());
@@ -286,7 +286,7 @@ public class MaterializedView
{
CFMetaData viewCfm = getViewCfs().metadata;
- Row.Builder builder = BTreeBackedRow.unsortedBuilder(viewCfm.partitionColumns().regulars, nowInSec);
+ Row.Builder builder = BTreeRow.unsortedBuilder(viewCfm.partitionColumns().regulars, nowInSec);
builder.newRow(viewClustering(temporalRow, resolver));
builder.addComplexDeletion(deletedColumn, deletionTime);
return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build());
@@ -363,7 +363,7 @@ public class MaterializedView
return null;
}
- Row.Builder regularBuilder = BTreeBackedRow.unsortedBuilder(viewCfs.metadata.partitionColumns().regulars, temporalRow.nowInSec);
+ Row.Builder regularBuilder = BTreeRow.unsortedBuilder(viewCfs.metadata.partitionColumns().regulars, temporalRow.nowInSec);
CBuilder clustering = CBuilder.create(viewCfs.getComparator());
for (int i = 0; i < viewCfs.metadata.clusteringColumns().size(); i++)
@@ -395,7 +395,7 @@ public class MaterializedView
* @return View Tombstones which delete all of the rows which have been removed from the base table with
* {@param partition}
*/
- private Collection<Mutation> createForDeletionInfo(TemporalRow.Set rowSet, AbstractThreadUnsafePartition partition)
+ private Collection<Mutation> createForDeletionInfo(TemporalRow.Set rowSet, AbstractBTreePartition partition)
{
final TemporalRow.Resolver resolver = TemporalRow.earliest;
@@ -558,7 +558,7 @@ public class MaterializedView
/**
* @return Set of rows which are contained in the partition update {@param partition}
*/
- private TemporalRow.Set separateRows(ByteBuffer key, AbstractThreadUnsafePartition partition)
+ private TemporalRow.Set separateRows(ByteBuffer key, AbstractBTreePartition partition)
{
Set<ColumnIdentifier> columns = new HashSet<>();
for (ColumnDefinition def : this.columns.primaryKeyDefs)
@@ -578,7 +578,7 @@ public class MaterializedView
* have been applied successfully. This is based solely on the changes that are necessary given the current
* state of the base table and the newly applying partition data.
*/
- public Collection<Mutation> createMutations(ByteBuffer key, AbstractThreadUnsafePartition partition, boolean isBuilding)
+ public Collection<Mutation> createMutations(ByteBuffer key, AbstractBTreePartition partition, boolean isBuilding)
{
if (!updateAffectsView(partition))
return null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/view/TemporalRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java b/src/java/org/apache/cassandra/db/view/TemporalRow.java
index d0ba5ea..00fdf48 100644
--- a/src/java/org/apache/cassandra/db/view/TemporalRow.java
+++ b/src/java/org/apache/cassandra/db/view/TemporalRow.java
@@ -45,7 +45,7 @@ import org.apache.cassandra.db.LivenessInfo;
import org.apache.cassandra.db.RangeTombstone;
import org.apache.cassandra.db.Slice;
import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.db.partitions.AbstractThreadUnsafePartition;
+import org.apache.cassandra.db.partitions.AbstractBTreePartition;
import org.apache.cassandra.db.rows.BufferCell;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.CellPath;
@@ -309,7 +309,7 @@ public class TemporalRow
return null;
}
- public DeletionTime deletionTime(AbstractThreadUnsafePartition partition)
+ public DeletionTime deletionTime(AbstractBTreePartition partition)
{
DeletionInfo deletionInfo = partition.deletionInfo();
if (!deletionInfo.getPartitionDeletion().isLive())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
index 65a5259..7382e5e 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
@@ -71,7 +71,7 @@ public abstract class SSTableSimpleIterator extends AbstractIterator<Unfiltered>
{
super(metadata, in, helper);
this.header = header;
- this.builder = BTreeBackedRow.sortedBuilder(helper.fetchedRegularColumns(header));
+ this.builder = BTreeRow.sortedBuilder(helper.fetchedRegularColumns(header));
}
public Row readStaticRow() throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index c9d9fa5..9213b20 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
@@ -35,7 +34,6 @@ import javax.management.ObjectName;
import com.google.common.util.concurrent.Futures;
import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +48,7 @@ import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.partitions.ArrayBackedCachedPartition;
+import org.apache.cassandra.db.partitions.CachedBTreePartition;
import org.apache.cassandra.db.partitions.CachedPartition;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -431,7 +429,7 @@ public class CacheService implements CacheServiceMBean
int nowInSec = FBUtilities.nowInSeconds();
try (OpOrder.Group op = cfs.readOrdering.start(); UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, nowInSec, key).queryMemtableAndDisk(cfs, op))
{
- CachedPartition toCache = ArrayBackedCachedPartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec), nowInSec);
+ CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec), nowInSec);
return Pair.create(new RowCacheKey(cfs.metadata.cfId, key), (IRowCacheEntry)toCache);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 6bfe94a..3dc323e 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -214,7 +214,7 @@ public class DataResolver extends ResponseResolver
{
if (currentRows[i] == null)
{
- currentRows[i] = BTreeBackedRow.sortedBuilder(clustering == Clustering.STATIC_CLUSTERING ? columns.statics : columns.regulars);
+ currentRows[i] = BTreeRow.sortedBuilder(clustering == Clustering.STATIC_CLUSTERING ? columns.statics : columns.regulars);
currentRows[i].newRow(clustering);
}
return currentRows[i];
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 733067e..cb0667a 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -824,7 +824,7 @@ public class CassandraServer implements Cassandra.Iface
{
LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(metadata, column_parent.super_column, column.name);
Cell cell = cellFromColumn(metadata, name, column);
- PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, key, BTreeBackedRow.singleCellRow(name.clustering, cell));
+ PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, key, BTreeRow.singleCellRow(name.clustering, cell));
mutation = new org.apache.cassandra.db.Mutation(update);
}
@@ -1329,7 +1329,7 @@ public class CassandraServer implements Cassandra.Iface
}
else if (column_path.super_column != null && column_path.column == null)
{
- Row row = BTreeBackedRow.emptyDeletedRow(new Clustering(column_path.super_column), new DeletionTime(timestamp, nowInSec));
+ Row row = BTreeRow.emptyDeletedRow(new Clustering(column_path.super_column), new DeletionTime(timestamp, nowInSec));
update = PartitionUpdate.singleRowUpdate(metadata, dk, row);
}
else
@@ -1339,7 +1339,7 @@ public class CassandraServer implements Cassandra.Iface
LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(metadata, column_path.super_column, column_path.column);
CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement);
Cell cell = BufferCell.tombstone(name.column, timestamp, nowInSec, path);
- update = PartitionUpdate.singleRowUpdate(metadata, dk, BTreeBackedRow.singleCellRow(name.clustering, cell));
+ update = PartitionUpdate.singleRowUpdate(metadata, dk, BTreeRow.singleCellRow(name.clustering, cell));
}
catch (UnknownColumnException e)
{
@@ -2138,7 +2138,7 @@ public class CassandraServer implements Cassandra.Iface
CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement);
Cell cell = BufferCell.live(metadata, name.column, FBUtilities.timestampMicros(), value, path);
- PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, key, BTreeBackedRow.singleCellRow(name.clustering, cell));
+ PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, key, BTreeRow.singleCellRow(name.clustering, cell));
org.apache.cassandra.db.Mutation mutation = new org.apache.cassandra.db.Mutation(update);
doInsert(consistency_level, Arrays.asList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level))));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java b/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
index 09e8d4b..084e835 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
@@ -112,7 +112,7 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator
super(results);
assert results.metadata().isStaticCompactTable();
this.nowInSec = nowInSec;
- this.builder = BTreeBackedRow.sortedBuilder(results.columns().regulars);
+ this.builder = BTreeRow.sortedBuilder(results.columns().regulars);
}
private void init()
@@ -220,7 +220,7 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator
this.superColumnMapColumn = results.metadata().compactValueColumn();
assert superColumnMapColumn != null && superColumnMapColumn.type instanceof MapType;
- this.builder = BTreeBackedRow.sortedBuilder(Columns.of(superColumnMapColumn));
+ this.builder = BTreeRow.sortedBuilder(Columns.of(superColumnMapColumn));
this.columnComparator = ((MapType)superColumnMapColumn.type).nameComparator();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
index d75d2f1..40d4094 100644
--- a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
+++ b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
@@ -156,7 +156,7 @@ public class TriggerExecutor
return merged;
}
- private Collection<PartitionUpdate> validateForSinglePartition(UUID cfId,
+ private List<PartitionUpdate> validateForSinglePartition(UUID cfId,
DecoratedKey key,
Collection<Mutation> tmutations)
throws InvalidRequestException
@@ -165,7 +165,7 @@ public class TriggerExecutor
if (tmutations.size() == 1)
{
- Collection<PartitionUpdate> updates = Iterables.getOnlyElement(tmutations).getPartitionUpdates();
+ List<PartitionUpdate> updates = Lists.newArrayList(Iterables.getOnlyElement(tmutations).getPartitionUpdates());
if (updates.size() > 1)
throw new InvalidRequestException("The updates generated by triggers are not all for the same partition");
validateSamePartition(cfId, key, Iterables.getOnlyElement(updates));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/utils/btree/BTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java b/src/java/org/apache/cassandra/utils/btree/BTree.java
index 353e7a5..fe08011 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTree.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTree.java
@@ -183,7 +183,7 @@ public class BTree
return btree;
}
- public static <K> Object[] merge(Object[] tree1, Object[] tree2, Comparator<K> comparator)
+ public static <K> Object[] merge(Object[] tree1, Object[] tree2, Comparator<? super K> comparator, UpdateFunction<K, K> updateF)
{
if (size(tree1) < size(tree2))
{
@@ -191,7 +191,7 @@ public class BTree
tree1 = tree2;
tree2 = tmp;
}
- return update(tree1, comparator, new BTreeSet<K>(tree2, comparator), UpdateFunction.<K>noOp());
+ return update(tree1, comparator, new BTreeSet<K>(tree2, comparator), updateF);
}
public static <V> Iterator<V> iterator(Object[] btree)
@@ -749,8 +749,15 @@ public class BTree
return new Builder<>(comparator);
}
+ public static <V> Builder<V> builder(Comparator<? super V> comparator, int initialCapacity)
+ {
+ return new Builder<>(comparator);
+ }
+
public static class Builder<V>
{
+
+ // a user-defined bulk resolution, to be applied manually via resolve()
public static interface Resolver
{
// can return a different output type to input, so long as sort order is maintained
@@ -759,15 +766,37 @@ public class BTree
Object resolve(Object[] array, int lb, int ub);
}
+ // a user-defined resolver that is applied automatically on encountering two duplicate values
+ public static interface QuickResolver<V>
+ {
+ // can return a different output type to input, so long as sort order is maintained
+ // if a resolver is present, this method will be called for every sequence of equal inputs
+ // even those with only one item
+ V resolve(V a, V b);
+ }
+
Comparator<? super V> comparator;
- Object[] values = new Object[10];
+ Object[] values;
int count;
- boolean detected; // true if we have managed to cheaply ensure sorted (+ filtered, if resolver == null) as we have added
+ boolean detected = true; // true if we have managed to cheaply ensure sorted (+ filtered, if resolver == null) as we have added
boolean auto = true; // false if the user has promised to enforce the sort order and resolve any duplicates
+ QuickResolver<V> quickResolver;
protected Builder(Comparator<? super V> comparator)
{
+ this(comparator, 16);
+ }
+
+ protected Builder(Comparator<? super V> comparator, int initialCapacity)
+ {
this.comparator = comparator;
+ this.values = new Object[initialCapacity];
+ }
+
+ public Builder<V> setQuickResolver(QuickResolver<V> quickResolver)
+ {
+ this.quickResolver = quickResolver;
+ return this;
}
public void reuse()
@@ -792,14 +821,20 @@ public class BTree
{
if (count == values.length)
values = Arrays.copyOf(values, count * 2);
- values[count++] = v;
- if (auto && detected && count > 1)
+ Object[] values = this.values;
+ int prevCount = this.count++;
+ values[prevCount] = v;
+
+ if (auto && detected && prevCount > 0)
{
- int c = comparator.compare((V) values[count - 2], (V) values[count - 1]);
+ V prev = (V) values[prevCount - 1];
+ int c = comparator.compare(prev, v);
if (c == 0 && auto)
{
- count--;
+ count = prevCount;
+ if (quickResolver != null)
+ values[prevCount - 1] = quickResolver.resolve(prev, v);
}
else if (c > 0)
{
@@ -881,7 +916,11 @@ public class BTree
if (c > 0)
break;
else if (c == 0)
+ {
+ if (quickResolver != null)
+ a[i] = quickResolver.resolve(ai, aj);
j++;
+ }
i++;
}
@@ -896,11 +935,14 @@ public class BTree
while (i < curEnd && j < addEnd)
{
+ V ai = (V) a[i];
+ V aj = (V) a[j];
// could avoid one comparison if we cared, but would make this ugly
- int c = comparator.compare((V) a[i], (V) a[j]);
+ int c = comparator.compare(ai, aj);
if (c == 0)
{
- a[newCount++] = a[i];
+ Object newValue = quickResolver == null ? ai : quickResolver.resolve(ai, aj);
+ a[newCount++] = newValue;
i++;
j++;
}
@@ -931,6 +973,19 @@ public class BTree
return count == 0;
}
+ public Builder<V> reverse()
+ {
+ assert !auto;
+ int mid = count / 2;
+ for (int i = 0 ; i < mid ; i++)
+ {
+ Object t = values[i];
+ values[i] = values[count - (1 + i)];
+ values[count - (1 + i)] = t;
+ }
+ return this;
+ }
+
public Builder<V> sort()
{
Arrays.sort((V[]) values, 0, count, comparator);
@@ -943,11 +998,17 @@ public class BTree
if (!detected && count > 1)
{
sort();
- int c = 1;
+ int prevIdx = 0;
+ V prev = (V) values[0];
for (int i = 1 ; i < count ; i++)
- if (comparator.compare((V) values[i], (V) values[i - 1]) != 0)
- values[c++] = values[i];
- count = c;
+ {
+ V next = (V) values[i];
+ if (comparator.compare(prev, next) != 0)
+ values[++prevIdx] = prev = next;
+ else if (quickResolver != null)
+ values[prevIdx] = prev = quickResolver.resolve(prev, next);
+ }
+ count = prevIdx + 1;
}
detected = true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java b/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java
index 9a7fa1b..0ab10c2 100644
--- a/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java
+++ b/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java
@@ -18,6 +18,8 @@
*/
package org.apache.cassandra.utils.btree;
+import java.util.function.BiFunction;
+
import com.google.common.base.Function;
/**
* An interface defining a function to be applied to both the object we are replacing in a BTree and
@@ -42,27 +44,26 @@ public interface UpdateFunction<K, V> extends Function<K, V>
*/
void allocated(long heapSize);
- static final UpdateFunction<Object, Object> noOp = new UpdateFunction<Object, Object>()
+ public static final class Simple<V> implements UpdateFunction<V, V>
{
- public Object apply(Object replacing, Object updating)
+ private final BiFunction<V, V, V> wrapped;
+ public Simple(BiFunction<V, V, V> wrapped)
{
- return updating;
+ this.wrapped = wrapped;
}
- public boolean abortEarly()
- {
- return false;
- }
+ public V apply(V v) { return v; }
+ public V apply(V replacing, V update) { return wrapped.apply(replacing, update); }
+ public boolean abortEarly() { return false; }
+ public void allocated(long heapSize) { }
- public void allocated(long heapSize)
+ public static <V> Simple<V> of(BiFunction<V, V, V> f)
{
+ return new Simple<>(f);
}
+ }
- public Object apply(Object k)
- {
- return k;
- }
- };
+ static final Simple<Object> noOp = Simple.of((a, b) -> a);
public static <K> UpdateFunction<K, K> noOp()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java b/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
index 8fd470f..a76732f 100644
--- a/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.Columns;
-import org.apache.cassandra.db.rows.BTreeBackedRow;
+import org.apache.cassandra.db.rows.BTreeRow;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -46,16 +46,16 @@ public abstract class AbstractAllocator
public abstract ByteBuffer allocate(int size);
- public Row.Builder cloningArrayBackedRowBuilder(Columns columns)
+ public Row.Builder cloningBTreeRowBuilder(Columns columns)
{
- return new CloningArrayBackedRowBuilder(columns, this);
+ return new CloningBTreeRowBuilder(columns, this);
}
- private static class CloningArrayBackedRowBuilder extends BTreeBackedRow.Builder
+ private static class CloningBTreeRowBuilder extends BTreeRow.Builder
{
private final AbstractAllocator allocator;
- private CloningArrayBackedRowBuilder(Columns columns, AbstractAllocator allocator)
+ private CloningBTreeRowBuilder(Columns columns, AbstractAllocator allocator)
{
super(columns, true);
this.allocator = allocator;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java b/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
index 31df444..8205f3b 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.utils.memory;
import java.nio.ByteBuffer;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -35,7 +34,7 @@ public abstract class MemtableBufferAllocator extends MemtableAllocator
public Row.Builder rowBuilder(CFMetaData metadata, OpOrder.Group writeOp, boolean isStatic)
{
Columns columns = isStatic ? metadata.partitionColumns().statics : metadata.partitionColumns().regulars;
- return allocator(writeOp).cloningArrayBackedRowBuilder(columns);
+ return allocator(writeOp).cloningBTreeRowBuilder(columns);
}
public DecoratedKey clone(DecoratedKey key, OpOrder.Group writeOp)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
----------------------------------------------------------------------
diff --git a/test/burn/org/apache/cassandra/utils/LongBTreeTest.java b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
index 37866d2..0e8c467 100644
--- a/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
+++ b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
@@ -18,6 +18,9 @@
*/
package org.apache.cassandra.utils;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
@@ -54,15 +57,16 @@ import static org.junit.Assert.assertTrue;
public class LongBTreeTest
{
+ private static final boolean DEBUG = false;
private static int perThreadTrees = 10000;
- private static int minTreeSize = 5;
- private static int maxTreeSize = 15;
- private static final boolean DEBUG = true;
+ private static int minTreeSize = 4;
+ private static int maxTreeSize = 10000;
+ private static int threads = DEBUG ? 1 : Runtime.getRuntime().availableProcessors() * 8;
private static final MetricRegistry metrics = new MetricRegistry();
private static final Timer BTREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "BTREE"));
private static final Timer TREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "TREE"));
- private static final ExecutorService MODIFY = Executors.newFixedThreadPool(DEBUG ? 1 : Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("MODIFY"));
- private static final ExecutorService COMPARE = DEBUG ? MODIFY : Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("COMPARE"));
+ private static final ExecutorService MODIFY = Executors.newFixedThreadPool(threads, new NamedThreadFactory("MODIFY"));
+ private static final ExecutorService COMPARE = DEBUG ? MODIFY : Executors.newFixedThreadPool(threads, new NamedThreadFactory("COMPARE"));
private static final RandomAbort<Integer> SPORADIC_ABORT = new RandomAbort<>(new Random(), 0.0001f);
static
@@ -316,7 +320,7 @@ public class LongBTreeTest
latch.await(1L, TimeUnit.SECONDS);
Assert.assertEquals(0, errors.get());
}
- System.out.println(String.format("%.0f%% complete %s", 100 * count.get() / (double) totalCount, errors.get() > 0 ? ("Errors: " + errors.get()) : ""));
+ log("%.1f%% complete %s", 100 * count.get() / (double) totalCount, errors.get() > 0 ? ("Errors: " + errors.get()) : "");
}
}
@@ -466,8 +470,10 @@ public class LongBTreeTest
private static RandomTree randomTree(int minSize, int maxSize)
{
- return ThreadLocalRandom.current().nextBoolean() ? randomTreeByUpdate(minSize, maxSize)
- : randomTreeByBuilder(minSize, maxSize);
+ // perform most of our tree constructions via update, as this is more efficient; since every run uses this
+ // we test builder disproportionately more often than if it had its own test anyway
+ return ThreadLocalRandom.current().nextFloat() < 0.95 ? randomTreeByUpdate(minSize, maxSize)
+ : randomTreeByBuilder(minSize, maxSize);
}
private static RandomTree randomTreeByUpdate(int minSize, int maxSize)
@@ -600,7 +606,7 @@ public class LongBTreeTest
TreeSet<Integer> canon = new TreeSet<>();
for (int i = 0 ; i < 10000000 ; i++)
canon.add(i);
- Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), null);
+ Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), UpdateFunction.noOp());
btree = BTree.update(btree, naturalOrder(), canon, UpdateFunction.<Integer>noOp());
canon.add(Integer.MIN_VALUE);
canon.add(Integer.MAX_VALUE);
@@ -611,47 +617,61 @@ public class LongBTreeTest
@Test
public void testIndividualInsertsSmallOverlappingRange() throws ExecutionException, InterruptedException
{
- testInsertions(10000000, 50, 1, 1, true);
+ testInsertions(50, 1, 1, true);
}
@Test
public void testBatchesSmallOverlappingRange() throws ExecutionException, InterruptedException
{
- testInsertions(10000000, 50, 1, 5, true);
+ testInsertions(50, 1, 5, true);
}
@Test
public void testIndividualInsertsMediumSparseRange() throws ExecutionException, InterruptedException
{
- testInsertions(10000000, 500, 10, 1, true);
+ testInsertions(perThreadTrees / 10, 500, 10, 1, true);
}
@Test
public void testBatchesMediumSparseRange() throws ExecutionException, InterruptedException
{
- testInsertions(10000000, 500, 10, 10, true);
+ testInsertions(500, 10, 10, true);
}
@Test
public void testLargeBatchesLargeRange() throws ExecutionException, InterruptedException
{
- testInsertions(100000000, 5000, 3, 100, true);
+ testInsertions(perThreadTrees / 10, Math.max(maxTreeSize, 5000), 3, 100, true);
+ }
+
+ @Test
+ public void testRandomRangeAndBatches() throws ExecutionException, InterruptedException
+ {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int treeSize = random.nextInt(maxTreeSize / 10, maxTreeSize * 10);
+ for (int i = 0 ; i < perThreadTrees / 10 ; i++)
+ testInsertions(threads * 10, treeSize, random.nextInt(1, 100) / 10f, treeSize / 100, true);
}
@Test
public void testSlicingSmallRandomTrees() throws ExecutionException, InterruptedException
{
- testInsertions(10000, 50, 10, 10, false);
+ testInsertions(50, 10, 10, false);
}
- private static void testInsertions(int totalCount, int perTestCount, int testKeyRatio, int modificationBatchSize, boolean quickEquality) throws ExecutionException, InterruptedException
+ private static void testInsertions(int perTestCount, float testKeyRatio, int modificationBatchSize, boolean quickEquality) throws ExecutionException, InterruptedException
+ {
+ int tests = perThreadTrees * threads;
+ testInsertions(tests, perTestCount, testKeyRatio, modificationBatchSize, quickEquality);
+ }
+
+ private static void testInsertions(int tests, int perTestCount, float testKeyRatio, int modificationBatchSize, boolean quickEquality) throws ExecutionException, InterruptedException
{
int batchesPerTest = perTestCount / modificationBatchSize;
- int maximumRunLength = 100;
- int testKeyRange = perTestCount * testKeyRatio;
- int tests = totalCount / perTestCount;
- System.out.println(String.format("Performing %d tests of %d operations, with %.2f max size/key-range ratio in batches of ~%d ops",
- tests, perTestCount, 1 / (float) testKeyRatio, modificationBatchSize));
+ int testKeyRange = (int) (perTestCount * testKeyRatio);
+ long totalCount = (long) perTestCount * tests;
+ log("Performing %d tests of %d operations, with %.2f max size/key-range ratio in batches of ~%d ops",
+ tests, perTestCount, 1 / testKeyRatio, modificationBatchSize);
// if we're not doing quick-equality, we can spam with garbage for all the checks we perform, so we'll split the work into smaller chunks
int chunkSize = quickEquality ? tests : (int) (100000 / Math.pow(perTestCount, 2));
@@ -660,30 +680,33 @@ public class LongBTreeTest
final List<ListenableFutureTask<List<ListenableFuture<?>>>> outer = new ArrayList<>();
for (int i = 0 ; i < chunkSize ; i++)
{
- outer.add(doOneTestInsertions(testKeyRange, maximumRunLength, modificationBatchSize, batchesPerTest, quickEquality));
+ int maxRunLength = modificationBatchSize == 1 ? 1 : ThreadLocalRandom.current().nextInt(1, modificationBatchSize);
+ outer.add(doOneTestInsertions(testKeyRange, maxRunLength, modificationBatchSize, batchesPerTest, quickEquality));
}
final List<ListenableFuture<?>> inner = new ArrayList<>();
- int complete = 0;
- int reportInterval = totalCount / 100;
- int lastReportAt = 0;
+ long complete = 0;
+ int reportInterval = Math.max(1000, (int) (totalCount / 10000));
+ long lastReportAt = 0;
for (ListenableFutureTask<List<ListenableFuture<?>>> f : outer)
{
inner.addAll(f.get());
complete += perTestCount;
if (complete - lastReportAt >= reportInterval)
{
- System.out.println(String.format("Completed %d of %d operations", (chunk * perTestCount) + complete, totalCount));
+ long done = (chunk * perTestCount) + complete;
+ float ratio = done / (float) totalCount;
+ log("Completed %.1f%% (%d of %d operations)", ratio * 100, done, totalCount);
lastReportAt = complete;
}
}
Futures.allAsList(inner).get();
}
Snapshot snap = BTREE_TIMER.getSnapshot();
- System.out.println(String.format("btree: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
+ log("btree: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile());
snap = TREE_TIMER.getSnapshot();
- System.out.println(String.format("java: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
- System.out.println("Done");
+ log("java: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile());
+ log("Done");
}
private static ListenableFutureTask<List<ListenableFuture<?>>> doOneTestInsertions(final int upperBound, final int maxRunLength, final int averageModsPerIteration, final int iterations, final boolean quickEquality)
@@ -697,11 +720,11 @@ public class LongBTreeTest
NavigableMap<Integer, Integer> canon = new TreeMap<>();
Object[] btree = BTree.empty();
final TreeMap<Integer, Integer> buffer = new TreeMap<>();
- final Random rnd = new Random();
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int i = 0 ; i < iterations ; i++)
{
buffer.clear();
- int mods = (averageModsPerIteration >> 1) + 1 + rnd.nextInt(averageModsPerIteration);
+ int mods = rnd.nextInt(1, averageModsPerIteration * 2);
while (mods > 0)
{
int v = rnd.nextInt(upperBound);
@@ -727,7 +750,7 @@ public class LongBTreeTest
if (!BTree.isWellFormed(btree, naturalOrder()))
{
- System.out.println("ERROR: Not well formed");
+ log("ERROR: Not well formed");
throw new AssertionError("Not well formed!");
}
if (quickEquality)
@@ -754,7 +777,7 @@ public class LongBTreeTest
for (int i = 0 ; i < 128 ; i++)
{
String id = String.format("[0..%d)", canon.size());
- System.out.println("Testing " + id);
+ log("Testing " + id);
Futures.allAsList(testAllSlices(id, cur, canon)).get();
Object[] next = null;
while (next == null)
@@ -819,7 +842,7 @@ public class LongBTreeTest
{
if (test != expect)
{
- System.out.println(String.format("%s: Expected %d, Got %d", id, expect, test));
+ log("%s: Expected %d, Got %d", id, expect, test);
}
}
@@ -832,18 +855,18 @@ public class LongBTreeTest
Object j = canon.next();
if (!Objects.equals(i, j))
{
- System.out.println(String.format("%s: Expected %d, Got %d", id, j, i));
+ log("%s: Expected %d, Got %d", id, j, i);
equal = false;
}
}
while (btree.hasNext())
{
- System.out.println(String.format("%s: Expected <Nil>, Got %d", id, btree.next()));
+ log("%s: Expected <Nil>, Got %d", id, btree.next());
equal = false;
}
while (canon.hasNext())
{
- System.out.println(String.format("%s: Expected %d, Got Nil", id, canon.next()));
+ log("%s: Expected %d, Got Nil", id, canon.next());
equal = false;
}
if (!equal)
@@ -930,4 +953,58 @@ public class LongBTreeTest
return v;
}
}
+
+ public static void main(String[] args) throws ExecutionException, InterruptedException, InvocationTargetException, IllegalAccessException
+ {
+ for (String arg : args)
+ {
+ if (arg.startsWith("fan="))
+ System.setProperty("cassandra.btree.fanfactor", arg.substring(4));
+ else if (arg.startsWith("min="))
+ minTreeSize = Integer.parseInt(arg.substring(4));
+ else if (arg.startsWith("max="))
+ maxTreeSize = Integer.parseInt(arg.substring(4));
+ else if (arg.startsWith("count="))
+ perThreadTrees = Integer.parseInt(arg.substring(6));
+ else
+ exit();
+ }
+
+ List<Method> methods = new ArrayList<>();
+ for (Method m : LongBTreeTest.class.getDeclaredMethods())
+ {
+ if (m.getParameters().length > 0)
+ continue;
+ for (Annotation annotation : m.getAnnotations())
+ if (annotation.annotationType() == Test.class)
+ methods.add(m);
+ }
+
+ LongBTreeTest test = new LongBTreeTest();
+ Collections.sort(methods, (a, b) -> a.getName().compareTo(b.getName()));
+ log(Lists.transform(methods, (m) -> m.getName()).toString());
+ for (Method m : methods)
+ {
+ log(m.getName());
+ m.invoke(test);
+ }
+ log("success");
+ }
+
+ private static void exit()
+ {
+ log("usage: fan=<int> min=<int> max=<int> count=<int>");
+ log("fan: btree fanout");
+ log("min: minimum btree size (must be >= 4)");
+ log("max: maximum btree size (must be >= 4)");
+ log("count: number of trees to assign each core, for each test");
+ }
+
+ private static void log(String formatstr, Object ... args)
+ {
+ args = Arrays.copyOf(args, args.length + 1);
+ System.arraycopy(args, 0, args, 1, args.length - 1);
+ args[0] = System.currentTimeMillis();
+ System.out.printf("%tT: " + formatstr + "\n", args);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 358168f..e8451e0 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -297,16 +297,16 @@ public class Util
}
}
- public static List<ArrayBackedPartition> getAllUnfiltered(ReadCommand command)
+ public static List<ImmutableBTreePartition> getAllUnfiltered(ReadCommand command)
{
- List<ArrayBackedPartition> results = new ArrayList<>();
+ List<ImmutableBTreePartition> results = new ArrayList<>();
try (ReadOrderGroup orderGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(orderGroup))
{
while (iterator.hasNext())
{
try (UnfilteredRowIterator partition = iterator.next())
{
- results.add(ArrayBackedPartition.create(partition));
+ results.add(ImmutableBTreePartition.create(partition));
}
}
}
@@ -362,7 +362,7 @@ public class Util
}
}
- public static ArrayBackedPartition getOnlyPartitionUnfiltered(ReadCommand cmd)
+ public static ImmutableBTreePartition getOnlyPartitionUnfiltered(ReadCommand cmd)
{
try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iterator = cmd.executeLocally(orderGroup))
{
@@ -370,7 +370,7 @@ public class Util
try (UnfilteredRowIterator partition = iterator.next())
{
assert !iterator.hasNext() : "Expecting a single partition but got more";
- return ArrayBackedPartition.create(partition);
+ return ImmutableBTreePartition.create(partition);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index 0448a16..5030029 100644
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@ -66,16 +66,16 @@ public class CacheProviderTest
cfm);
}
- private ArrayBackedCachedPartition createPartition()
+ private CachedBTreePartition createPartition()
{
PartitionUpdate update = new RowUpdateBuilder(cfm, System.currentTimeMillis(), "key1")
.add("col1", "val1")
.buildUpdate();
- return ArrayBackedCachedPartition.create(update.unfilteredIterator(), FBUtilities.nowInSeconds());
+ return CachedBTreePartition.create(update.unfilteredIterator(), FBUtilities.nowInSeconds());
}
- private void simpleCase(ArrayBackedCachedPartition partition, ICache<MeasureableString, IRowCacheEntry> cache)
+ private void simpleCase(CachedBTreePartition partition, ICache<MeasureableString, IRowCacheEntry> cache)
{
cache.put(key1, partition);
assertNotNull(cache.get(key1));
@@ -89,15 +89,15 @@ public class CacheProviderTest
assertEquals(CAPACITY, cache.size());
}
- private void assertDigests(IRowCacheEntry one, ArrayBackedCachedPartition two)
+ private void assertDigests(IRowCacheEntry one, CachedBTreePartition two)
{
- assertTrue(one instanceof ArrayBackedCachedPartition);
+ assertTrue(one instanceof CachedBTreePartition);
try
{
MessageDigest d1 = MessageDigest.getInstance("MD5");
MessageDigest d2 = MessageDigest.getInstance("MD5");
- UnfilteredRowIterators.digest(((ArrayBackedCachedPartition) one).unfilteredIterator(), d1);
- UnfilteredRowIterators.digest(((ArrayBackedCachedPartition) two).unfilteredIterator(), d2);
+ UnfilteredRowIterators.digest(((CachedBTreePartition) one).unfilteredIterator(), d1);
+ UnfilteredRowIterators.digest(((CachedBTreePartition) two).unfilteredIterator(), d2);
assertTrue(MessageDigest.isEqual(d1.digest(), d2.digest()));
}
catch (NoSuchAlgorithmException e)
@@ -106,7 +106,7 @@ public class CacheProviderTest
}
}
- private void concurrentCase(final ArrayBackedCachedPartition partition, final ICache<MeasureableString, IRowCacheEntry> cache) throws InterruptedException
+ private void concurrentCase(final CachedBTreePartition partition, final ICache<MeasureableString, IRowCacheEntry> cache) throws InterruptedException
{
final long startTime = System.currentTimeMillis() + 500;
Runnable runnable = new Runnable()
@@ -140,7 +140,7 @@ public class CacheProviderTest
public void testSerializingCache() throws InterruptedException
{
ICache<MeasureableString, IRowCacheEntry> cache = SerializingCache.create(CAPACITY, Weighers.<RefCountedMemory>singleton(), new SerializingCacheProvider.RowCacheSerializer());
- ArrayBackedCachedPartition partition = createPartition();
+ CachedBTreePartition partition = createPartition();
simpleCase(partition, cache);
concurrentCase(partition, cache);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 918e10d..721963d 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -754,7 +754,7 @@ public abstract class CQLTester
if (!Objects.equal(expectedByteValue, actualValue))
{
Object actualValueDecoded = actualValue == null ? null : column.type.getSerializer().deserialize(actualValue);
- if (!expected[j].equals(actualValueDecoded))
+ if (!Objects.equal(expected[j], actualValueDecoded))
Assert.fail(String.format("Invalid value for row %d column %d (%s of type %s), expected <%s> but got <%s>",
i,
j,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
index fbb7a5b..8c79689 100644
--- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
@@ -20,9 +20,19 @@ package org.apache.cassandra.db;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
import java.io.IOException;
-import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ExecutionException;
@@ -35,24 +45,17 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.Util.PartitionerSwitcher;
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.db.partitions.ArrayBackedPartition;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
public class BatchlogManagerTest
@@ -108,7 +111,7 @@ public class BatchlogManagerTest
.applyUnsafe();
DecoratedKey dk = cfs.decorateKey(ByteBufferUtil.bytes("1234"));
- ArrayBackedPartition results = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, dk).build());
+ ImmutableBTreePartition results = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, dk).build());
Iterator<Row> iter = results.iterator();
assert iter.hasNext();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/test/unit/org/apache/cassandra/db/DeletePartitionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DeletePartitionTest.java b/test/unit/org/apache/cassandra/db/DeletePartitionTest.java
index 6ab9a90..a65befd 100644
--- a/test/unit/org/apache/cassandra/db/DeletePartitionTest.java
+++ b/test/unit/org/apache/cassandra/db/DeletePartitionTest.java
@@ -86,7 +86,7 @@ public class DeletePartitionTest
store.forceBlockingFlush();
// validate removal
- ArrayBackedPartition partitionUnfiltered = Util.getOnlyPartitionUnfiltered(Util.cmd(store, key).build());
+ ImmutableBTreePartition partitionUnfiltered = Util.getOnlyPartitionUnfiltered(Util.cmd(store, key).build());
assertFalse(partitionUnfiltered.partitionLevelDeletion().isLive());
assertFalse(partitionUnfiltered.iterator().hasNext());
}