You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/07/28 16:00:59 UTC
[1/2] cassandra git commit: Use BTree to back default Row and
ComplexColumnData objects
Repository: cassandra
Updated Branches:
refs/heads/trunk de420e50e -> 639d4b240
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/ColumnData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java b/src/java/org/apache/cassandra/db/rows/ColumnData.java
index 95bad48..84763e5 100644
--- a/src/java/org/apache/cassandra/db/rows/ColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java
@@ -31,42 +31,45 @@ import org.apache.cassandra.serializers.MarshalException;
* In practice, there is only 2 implementations of this: either {@link Cell} for simple columns
* or {@code ComplexColumnData} for complex columns.
*/
-public interface ColumnData
+public abstract class ColumnData
{
public static final Comparator<ColumnData> comparator = (cd1, cd2) -> cd1.column().compareTo(cd2.column());
- // A comparator for the cells of the *similar* ColumnData, i.e. one that assumes the cells are all for the same column.
- public static final Comparator<Cell> cellComparator = (c1, c2) -> c1.column().cellPathComparator().compare(c1.path(), c2.path());
+ protected final ColumnDefinition column;
+ protected ColumnData(ColumnDefinition column)
+ {
+ this.column = column;
+ }
/**
* The column this is data for.
*
* @return the column this is a data for.
*/
- public ColumnDefinition column();
+ public final ColumnDefinition column() { return column; }
/**
* The size of the data hold by this {@code ColumnData}.
*
* @return the size used by the data of this {@code ColumnData}.
*/
- public int dataSize();
+ public abstract int dataSize();
- public long unsharedHeapSizeExcludingData();
+ public abstract long unsharedHeapSizeExcludingData();
/**
* Validate the column data.
*
* @throws MarshalException if the data is not valid.
*/
- public void validate();
+ public abstract void validate();
/**
* Adds the data to the provided digest.
*
* @param digest the {@code MessageDigest} to add the data to.
*/
- public void digest(MessageDigest digest);
+ public abstract void digest(MessageDigest digest);
/**
* Returns a copy of the data where all timestamps for live data have replaced by {@code newTimestamp} and
@@ -74,9 +77,9 @@ public interface ColumnData
*
* This exists for the Paxos path, see {@link PartitionUpdate#updateAllTimestamp} for additional details.
*/
- public ColumnData updateAllTimestamp(long newTimestamp);
+ public abstract ColumnData updateAllTimestamp(long newTimestamp);
- public ColumnData markCounterLocalToBeCleared();
+ public abstract ColumnData markCounterLocalToBeCleared();
- public ColumnData purge(DeletionPurger purger, int nowInSec);
+ public abstract ColumnData purge(DeletionPurger purger, int nowInSec);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
index d87402a..76ab7e7 100644
--- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
@@ -20,8 +20,10 @@ package org.apache.cassandra.db.rows;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.*;
+import java.util.function.BiFunction;
-import com.google.common.collect.Iterators;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
@@ -31,59 +33,52 @@ import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.marshal.ByteType;
import org.apache.cassandra.db.marshal.SetType;
import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.UpdateFunction;
/**
* The data for a complex column, that is it's cells and potential complex
* deletion time.
*/
-public class ComplexColumnData implements ColumnData, Iterable<Cell>
+public class ComplexColumnData extends ColumnData implements Iterable<Cell>
{
static final Cell[] NO_CELLS = new Cell[0];
private static final long EMPTY_SIZE = ObjectSizes.measure(new ComplexColumnData(ColumnDefinition.regularDef("", "", "", SetType.getInstance(ByteType.instance, true)), NO_CELLS, new DeletionTime(0, 0)));
- private final ColumnDefinition column;
-
// The cells for 'column' sorted by cell path.
- private final Cell[] cells;
+ private final Object[] cells;
private final DeletionTime complexDeletion;
// Only ArrayBackedRow should call this.
- ComplexColumnData(ColumnDefinition column, Cell[] cells, DeletionTime complexDeletion)
+ ComplexColumnData(ColumnDefinition column, Object[] cells, DeletionTime complexDeletion)
{
+ super(column);
assert column.isComplex();
assert cells.length > 0 || !complexDeletion.isLive();
- this.column = column;
this.cells = cells;
this.complexDeletion = complexDeletion;
}
public boolean hasCells()
{
- return cellsCount() > 0;
+ return !BTree.isEmpty(cells);
}
public int cellsCount()
{
- return cells.length;
- }
-
- public ColumnDefinition column()
- {
- return column;
+ return BTree.size(cells);
}
public Cell getCell(CellPath path)
{
- int idx = binarySearch(path);
- return idx < 0 ? null : cells[idx];
+ return (Cell) BTree.<Object>find(cells, column.asymmetricCellPathComparator(), path);
}
- public Cell getCellByIndex(int i)
+ public Cell getCellByIndex(int idx)
{
- assert 0 <= i && i < cells.length;
- return cells[i];
+ return BTree.findByIndex(cells, idx);
}
/**
@@ -104,13 +99,13 @@ public class ComplexColumnData implements ColumnData, Iterable<Cell>
public Iterator<Cell> iterator()
{
- return Iterators.forArray(cells);
+ return BTree.iterator(cells);
}
public int dataSize()
{
int size = complexDeletion.dataSize();
- for (Cell cell : cells)
+ for (Cell cell : this)
size += cell.dataSize();
return size;
}
@@ -118,167 +113,78 @@ public class ComplexColumnData implements ColumnData, Iterable<Cell>
public long unsharedHeapSizeExcludingData()
{
long heapSize = EMPTY_SIZE + ObjectSizes.sizeOfArray(cells);
- for (Cell cell : cells)
+ // TODO: this can be turned into a simple multiplication, at least while we have only one Cell implementation
+ for (Cell cell : this)
heapSize += cell.unsharedHeapSizeExcludingData();
return heapSize;
}
public void validate()
{
- for (Cell cell : cells)
+ for (Cell cell : this)
cell.validate();
}
- public ComplexColumnData filter(ColumnFilter filter, DeletionTime activeDeletion, CFMetaData.DroppedColumn dropped)
- {
- ColumnFilter.Tester cellTester = filter.newTester(column);
- if (cellTester == null && activeDeletion.isLive() && dropped == null)
- return this;
-
- DeletionTime newComplexDeletion = activeDeletion.supersedes(complexDeletion) ? DeletionTime.LIVE : complexDeletion;
-
- int newSize = 0;
- for (Cell cell : cells)
- {
- // The cell must be:
- // - Included by the query
- // - not shadowed by the active deletion
- // - not being for a dropped column
- if ((cellTester == null || cellTester.includes(cell.path()))
- && !activeDeletion.deletes(cell)
- && (dropped == null || cell.timestamp() > dropped.droppedTime))
- ++newSize;
- }
-
-
- if (newSize == 0)
- return newComplexDeletion.isLive() ? null : new ComplexColumnData(column, NO_CELLS, newComplexDeletion);
-
- if (newSize == cells.length && newComplexDeletion == complexDeletion)
- return this;
-
- Cell[] newCells = new Cell[newSize];
- int j = 0;
- cellTester = filter.newTester(column); // we need to reste the tester
- for (Cell cell : cells)
- {
- if ((cellTester == null || cellTester.includes(cell.path()))
- && !activeDeletion.deletes(cell)
- && (dropped == null || cell.timestamp() > dropped.droppedTime))
- newCells[j++] = cell;
- }
- assert j == newSize;
-
- return new ComplexColumnData(column, newCells, newComplexDeletion);
- }
-
public void digest(MessageDigest digest)
{
if (!complexDeletion.isLive())
complexDeletion.digest(digest);
- for (Cell cell : cells)
+ for (Cell cell : this)
cell.digest(digest);
}
public ComplexColumnData markCounterLocalToBeCleared()
{
- Cell[] newCells = null;
- for (int i = 0; i < cells.length; i++)
- {
- Cell cell = cells[i];
- Cell marked = cell.markCounterLocalToBeCleared();
- if (marked != cell)
- {
- if (newCells == null)
- newCells = Arrays.copyOf(cells, cells.length);
- newCells[i] = marked;
- }
- }
+ return transformAndFilter(complexDeletion, Cell::markCounterLocalToBeCleared);
+ }
+
+ public ComplexColumnData filter(ColumnFilter filter, DeletionTime activeDeletion, CFMetaData.DroppedColumn dropped)
+ {
+ ColumnFilter.Tester cellTester = filter.newTester(column);
+ if (cellTester == null && activeDeletion.isLive() && dropped == null)
+ return this;
- return newCells == null
- ? this
- : new ComplexColumnData(column, newCells, complexDeletion);
+ DeletionTime newDeletion = activeDeletion.supersedes(complexDeletion) ? DeletionTime.LIVE : complexDeletion;
+ return transformAndFilter(newDeletion,
+ (cell) ->
+ (cellTester == null || cellTester.includes(cell.path()))
+ && !activeDeletion.deletes(cell)
+ && (dropped == null || cell.timestamp() > dropped.droppedTime)
+ ? cell : null);
}
public ComplexColumnData purge(DeletionPurger purger, int nowInSec)
{
DeletionTime newDeletion = complexDeletion.isLive() || purger.shouldPurge(complexDeletion) ? DeletionTime.LIVE : complexDeletion;
+ return transformAndFilter(newDeletion, (cell) -> cell.purge(purger, nowInSec));
+ }
- int newSize = 0;
- for (Cell cell : cells)
- {
- Cell purged = cell.purge(purger, nowInSec);
- if (purged != null)
- ++newSize;
- }
-
- if (newSize == 0)
- return newDeletion.isLive() ? null : new ComplexColumnData(column, NO_CELLS, newDeletion);
+ private ComplexColumnData transformAndFilter(DeletionTime newDeletion, Function<? super Cell, ? extends Cell> function)
+ {
+ Object[] transformed = BTree.transformAndFilter(cells, function);
- if (newDeletion == complexDeletion && newSize == cells.length)
+ if (cells == transformed && newDeletion == complexDeletion)
return this;
- Cell[] newCells = new Cell[newSize];
- int j = 0;
- for (Cell cell : cells)
- {
- Cell purged = cell.purge(purger, nowInSec);
- if (purged != null)
- newCells[j++] = purged;
- }
- assert j == newSize;
+ if (newDeletion == DeletionTime.LIVE && BTree.isEmpty(transformed))
+ return null;
- return new ComplexColumnData(column, newCells, newDeletion);
+ return new ComplexColumnData(column, transformed, newDeletion);
}
public ComplexColumnData updateAllTimestamp(long newTimestamp)
{
DeletionTime newDeletion = complexDeletion.isLive() ? complexDeletion : new DeletionTime(newTimestamp - 1, complexDeletion.localDeletionTime());
- Cell[] newCells = new Cell[cells.length];
- for (int i = 0; i < cells.length; i++)
- newCells[i] = (Cell)cells[i].updateAllTimestamp(newTimestamp);
-
- return new ComplexColumnData(column, newCells, newDeletion);
+ return transformAndFilter(newDeletion, (cell) -> (Cell) cell.updateAllTimestamp(newTimestamp));
}
// This is the partner in crime of ArrayBackedRow.setValue. The exact warning apply. The short
// version is: "don't use that method".
void setValue(CellPath path, ByteBuffer value)
{
- int idx = binarySearch(path);
- assert idx >= 0;
- cells[idx] = cells[idx].withUpdatedValue(value);
- }
-
- private int binarySearch(CellPath path)
- {
- return binarySearch(path, 0, cells.length);
- }
-
- /**
- * Simple binary search for a given cell (in the cells array).
- *
- * The return value has the exact same meaning that the one of Collections.binarySearch() but
- * we don't use the later because we're searching for a 'CellPath' in an array of 'Cell'.
- */
- private int binarySearch(CellPath path, int fromIndex, int toIndex)
- {
- int low = fromIndex;
- int mid = toIndex;
- int high = mid - 1;
- int result = -1;
- while (low <= high)
- {
- mid = (low + high) >> 1;
- if ((result = column.cellPathComparator().compare(path, cells[mid].path())) > 0)
- low = mid + 1;
- else if (result == 0)
- return mid;
- else
- high = mid - 1;
- }
- return -mid - (result < 0 ? 1 : 2);
+ Cell current = (Cell) BTree.<Object>find(cells, column.asymmetricCellPathComparator(), path);
+ BTree.replaceInSitu(cells, column.cellComparator(), current, current.withUpdatedValue(value));
}
@Override
@@ -293,7 +199,7 @@ public class ComplexColumnData implements ColumnData, Iterable<Cell>
ComplexColumnData that = (ComplexColumnData)other;
return this.column().equals(that.column())
&& this.complexDeletion().equals(that.complexDeletion)
- && Arrays.equals(this.cells, that.cells);
+ && BTree.equals(this.cells, that.cells);
}
@Override
@@ -309,15 +215,20 @@ public class ComplexColumnData implements ColumnData, Iterable<Cell>
public static class Builder
{
- private ColumnDefinition column;
+ private static BiFunction<Cell, Cell, Cell> noResolve = (a, b) -> {
+ throw new IllegalStateException();
+ };
+
private DeletionTime complexDeletion;
- public final List<Cell> cells = new ArrayList<>();
+ private ColumnDefinition column;
+ private BTree.Builder<Cell> builder;
public void newColumn(ColumnDefinition column)
{
this.column = column;
this.complexDeletion = DeletionTime.LIVE; // default if writeComplexDeletion is not called
- this.cells.clear();
+ if (builder == null) builder = BTree.builder(column.cellComparator());
+ else builder.reuse(column.cellComparator());
}
public void addComplexDeletion(DeletionTime complexDeletion)
@@ -328,16 +239,15 @@ public class ComplexColumnData implements ColumnData, Iterable<Cell>
public void addCell(Cell cell)
{
assert cell.column().equals(column);
- assert cells.isEmpty() || cell.column().cellPathComparator().compare(cells.get(cells.size() - 1).path(), cell.path()) < 0;
- cells.add(cell);
+ builder.add(cell);
}
public ComplexColumnData build()
{
- if (complexDeletion.isLive() && cells.isEmpty())
+ if (complexDeletion.isLive() && builder.isEmpty())
return null;
- return new ComplexColumnData(column, cells.toArray(new Cell[cells.size()]), complexDeletion);
+ return new ComplexColumnData(column, builder.build(), complexDeletion);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
index ad21c69..929a5fb 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -26,6 +26,8 @@ import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.service.paxos.Commit;
import org.apache.cassandra.utils.MergeIterator;
import org.apache.cassandra.utils.SearchIterator;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.UpdateFunction;
/**
* Storage engine representation of a row.
@@ -381,7 +383,7 @@ public interface Row extends Unfiltered, Iterable<ColumnData>
// Because some data might have been shadowed by the 'activeDeletion', we could have an empty row
return rowInfo.isEmpty() && rowDeletion.isLive() && dataBuffer.isEmpty()
? null
- : ArrayBackedRow.create(clustering, columns, rowInfo, rowDeletion, dataBuffer.size(), dataBuffer.toArray(new ColumnData[dataBuffer.size()]));
+ : BTreeBackedRow.create(clustering, columns, rowInfo, rowDeletion, BTree.build(dataBuffer, UpdateFunction.<ColumnData>noOp()));
}
public Clustering mergedClustering()
@@ -463,7 +465,7 @@ public interface Row extends Unfiltered, Iterable<ColumnData>
cellReducer.setActiveDeletion(activeDeletion);
}
- Iterator<Cell> cells = MergeIterator.get(complexCells, ColumnData.cellComparator, cellReducer);
+ Iterator<Cell> cells = MergeIterator.get(complexCells, ColumnData.comparator, cellReducer);
while (cells.hasNext())
{
Cell merged = cells.next();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/Rows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java
index 122f7d3..bacd591 100644
--- a/src/java/org/apache/cassandra/db/rows/Rows.java
+++ b/src/java/org/apache/cassandra/db/rows/Rows.java
@@ -49,7 +49,7 @@ public abstract class Rows
private Rows() {}
- public static final Row EMPTY_STATIC_ROW = ArrayBackedRow.emptyRow(Clustering.STATIC_CLUSTERING);
+ public static final Row EMPTY_STATIC_ROW = BTreeBackedRow.emptyRow(Clustering.STATIC_CLUSTERING);
public static Row.Builder copy(Row row, Row.Builder builder)
{
@@ -217,7 +217,7 @@ public abstract class Rows
public static Row merge(Row row1, Row row2, int nowInSec)
{
Columns mergedColumns = row1.columns().mergeTo(row2.columns());
- Row.Builder builder = ArrayBackedRow.sortedBuilder(mergedColumns);
+ Row.Builder builder = BTreeBackedRow.sortedBuilder(mergedColumns);
merge(row1, row2, mergedColumns, builder, nowInSec, SecondaryIndexManager.nullUpdater);
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index ec46751..3a12584 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -204,7 +204,7 @@ public class UnfilteredRowIteratorSerializer
final SerializationHeader sHeader = header.sHeader;
return new AbstractUnfilteredRowIterator(metadata, header.key, header.partitionDeletion, sHeader.columns(), header.staticRow, header.isReversed, sHeader.stats())
{
- private final Row.Builder builder = ArrayBackedRow.sortedBuilder(sHeader.columns().regulars);
+ private final Row.Builder builder = BTreeBackedRow.sortedBuilder(sHeader.columns().regulars);
protected Unfiltered computeNext()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 5f110bb..650a18d 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -541,7 +541,7 @@ public abstract class UnfilteredRowIterators
{
Row merged = rowMerger.merge(markerMerger.activeDeletion());
if (listener != null)
- listener.onMergedRows(merged == null ? ArrayBackedRow.emptyRow(rowMerger.mergedClustering()) : merged, columns().regulars, rowMerger.mergedRows());
+ listener.onMergedRows(merged == null ? BTreeBackedRow.emptyRow(rowMerger.mergedClustering()) : merged, columns().regulars, rowMerger.mergedRows());
return merged;
}
else
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 358c841..f6eb62a 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -341,7 +341,7 @@ public class UnfilteredSerializer
{
int flags = in.readUnsignedByte();
assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW && isStatic(flags) : flags;
- Row.Builder builder = ArrayBackedRow.sortedBuilder(helper.fetchedStaticColumns(header));
+ Row.Builder builder = BTreeBackedRow.sortedBuilder(helper.fetchedStaticColumns(header));
builder.newRow(Clustering.STATIC_CLUSTERING);
return deserializeRowBody(in, header, helper, flags, builder);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
index 56b621a..5a0a0ad 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
@@ -71,7 +71,7 @@ public abstract class SSTableSimpleIterator extends AbstractIterator<Unfiltered>
{
super(metadata, in, helper);
this.header = header;
- this.builder = ArrayBackedRow.sortedBuilder(helper.fetchedRegularColumns(header));
+ this.builder = BTreeBackedRow.sortedBuilder(helper.fetchedRegularColumns(header));
}
public Row readStaticRow() throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 515a0d8..a1b5c96 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -214,7 +214,7 @@ public class DataResolver extends ResponseResolver
{
if (currentRows[i] == null)
{
- currentRows[i] = ArrayBackedRow.sortedBuilder(clustering == Clustering.STATIC_CLUSTERING ? columns.statics : columns.regulars);
+ currentRows[i] = BTreeBackedRow.sortedBuilder(clustering == Clustering.STATIC_CLUSTERING ? columns.statics : columns.regulars);
currentRows[i].newRow(clustering);
}
return currentRows[i];
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 37b3476..b38f58e 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -27,7 +27,6 @@ import java.util.concurrent.TimeoutException;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
-import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.*;
import com.google.common.primitives.Longs;
@@ -823,7 +822,7 @@ public class CassandraServer implements Cassandra.Iface
LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(metadata, column_parent.super_column, column.name);
Cell cell = cellFromColumn(metadata, name, column);
- PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, dk, ArrayBackedRow.singleCellRow(name.clustering, cell));
+ PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, dk, BTreeBackedRow.singleCellRow(name.clustering, cell));
mutation = new org.apache.cassandra.db.Mutation(update);
}
@@ -1319,7 +1318,7 @@ public class CassandraServer implements Cassandra.Iface
}
else if (column_path.super_column != null && column_path.column == null)
{
- Row row = ArrayBackedRow.emptyDeletedRow(new Clustering(column_path.super_column), new DeletionTime(timestamp, nowInSec));
+ Row row = BTreeBackedRow.emptyDeletedRow(new Clustering(column_path.super_column), new DeletionTime(timestamp, nowInSec));
update = PartitionUpdate.singleRowUpdate(metadata, dk, row);
}
else
@@ -1329,7 +1328,7 @@ public class CassandraServer implements Cassandra.Iface
LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(metadata, column_path.super_column, column_path.column);
CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement);
Cell cell = BufferCell.tombstone(name.column, timestamp, nowInSec, path);
- update = PartitionUpdate.singleRowUpdate(metadata, dk, ArrayBackedRow.singleCellRow(name.clustering, cell));
+ update = PartitionUpdate.singleRowUpdate(metadata, dk, BTreeBackedRow.singleCellRow(name.clustering, cell));
}
catch (UnknownColumnException e)
{
@@ -2114,7 +2113,7 @@ public class CassandraServer implements Cassandra.Iface
CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement);
Cell cell = BufferCell.live(metadata, name.column, FBUtilities.timestampMicros(), value, path);
- PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, dk, ArrayBackedRow.singleCellRow(name.clustering, cell));
+ PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, dk, BTreeBackedRow.singleCellRow(name.clustering, cell));
org.apache.cassandra.db.Mutation mutation = new org.apache.cassandra.db.Mutation(update);
doInsert(consistency_level, Arrays.asList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level))));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java b/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
index 9c5a99f..59d1b68 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
@@ -112,7 +112,7 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator
super(results);
assert results.metadata().isStaticCompactTable();
this.nowInSec = nowInSec;
- this.builder = ArrayBackedRow.sortedBuilder(results.columns().regulars);
+ this.builder = BTreeBackedRow.sortedBuilder(results.columns().regulars);
}
private void init()
@@ -220,7 +220,7 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator
this.superColumnMapColumn = results.metadata().compactValueColumn();
assert superColumnMapColumn != null && superColumnMapColumn.type instanceof MapType;
- this.builder = ArrayBackedRow.sortedBuilder(Columns.of(superColumnMapColumn));
+ this.builder = BTreeBackedRow.sortedBuilder(Columns.of(superColumnMapColumn));
this.columnComparator = ((MapType)superColumnMapColumn.type).nameComparator();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/utils/btree/BTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java b/src/java/org/apache/cassandra/utils/btree/BTree.java
index 6e15638..62942b4 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTree.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTree.java
@@ -20,10 +20,15 @@ package org.apache.cassandra.utils.btree;
import java.util.*;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Ordering;
import org.apache.cassandra.utils.ObjectSizes;
+import static com.google.common.collect.Iterables.concat;
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.transform;
import static java.lang.Math.max;
import static java.lang.Math.min;
@@ -68,11 +73,15 @@ public class BTree
// An empty BTree branch - used only for internal purposes in Modifier
static final Object[] EMPTY_BRANCH = new Object[] { null, new int[0] };
- /**
- * Returns an empty BTree
- *
- * @return
- */
+ // direction of iteration
+ public static enum Dir
+ {
+ ASC, DESC;
+ public Dir invert() { return this == ASC ? DESC : ASC; }
+ public static Dir asc(boolean asc) { return asc ? ASC : DESC; }
+ public static Dir desc(boolean desc) { return desc ? DESC : ASC; }
+ }
+
public static Object[] empty()
{
return EMPTY_LEAF;
@@ -85,18 +94,35 @@ public class BTree
public static <C, K extends C, V extends C> Object[] build(Collection<K> source, UpdateFunction<K, V> updateF)
{
- return build(source, source.size(), updateF);
+ return buildInternal(source, source.size(), updateF);
+ }
+
+ public static <C, K extends C, V extends C> Object[] build(Iterable<K> source, UpdateFunction<K, V> updateF)
+ {
+ return buildInternal(source, -1, updateF);
}
/**
* Creates a BTree containing all of the objects in the provided collection
*
* @param source the items to build the tree with. MUST BE IN STRICTLY ASCENDING ORDER.
+ * @param size the size of the source iterable
* @return a btree representing the contents of the provided iterable
*/
public static <C, K extends C, V extends C> Object[] build(Iterable<K> source, int size, UpdateFunction<K, V> updateF)
{
- if (size < FAN_FACTOR)
+ if (size < 0)
+ throw new IllegalArgumentException(Integer.toString(size));
+ return buildInternal(source, size, updateF);
+ }
+
+ /**
+ * As build(), except:
+ * @param size < 0 if size is unknown
+ */
+ private static <C, K extends C, V extends C> Object[] buildInternal(Iterable<K> source, int size, UpdateFunction<K, V> updateF)
+ {
+ if ((size >= 0) & (size < FAN_FACTOR))
{
if (size == 0)
return EMPTY_LEAF;
@@ -168,17 +194,47 @@ public class BTree
return update(tree1, comparator, new BTreeSet<K>(tree2, comparator), UpdateFunction.<K>noOp());
}
+ public static <V> Iterator<V> iterator(Object[] btree)
+ {
+ return iterator(btree, Dir.ASC);
+ }
+
+ public static <V> Iterator<V> iterator(Object[] btree, Dir dir)
+ {
+ return new BTreeSearchIterator<V, V>(btree, null, dir);
+ }
+
+ public static <V> Iterator<V> iterator(Object[] btree, int lb, int ub, Dir dir)
+ {
+ return new BTreeSearchIterator<V, V>(btree, null, dir, lb, ub);
+ }
+
+ public static <V> Iterable<V> iterable(Object[] btree)
+ {
+ return iterable(btree, Dir.ASC);
+ }
+
+ public static <V> Iterable<V> iterable(Object[] btree, Dir dir)
+ {
+ return () -> iterator(btree, dir);
+ }
+
+ public static <V> Iterable<V> iterable(Object[] btree, int lb, int ub, Dir dir)
+ {
+ return () -> iterator(btree, lb, ub, dir);
+ }
+
/**
* Returns an Iterator over the entire tree
*
- * @param btree the tree to iterate over
- * @param forwards if false, the iterator will start at the end and move backwards
+ * @param btree the tree to iterate over
+ * @param dir direction of iteration
* @param <V>
* @return
*/
- public static <V> BTreeSearchIterator<V, V> slice(Object[] btree, Comparator<? super V> comparator, boolean forwards)
+ public static <K, V> BTreeSearchIterator<K, V> slice(Object[] btree, Comparator<? super K> comparator, Dir dir)
{
- return new BTreeSearchIterator<>(btree, comparator, forwards);
+ return new BTreeSearchIterator<>(btree, comparator, dir);
}
/**
@@ -186,12 +242,12 @@ public class BTree
* @param comparator the comparator that defines the ordering over the items in the tree
* @param start the beginning of the range to return, inclusive (in ascending order)
* @param end the end of the range to return, exclusive (in ascending order)
- * @param forwards if false, the iterator will start at the last item and move backwards
+ * @param dir if false, the iterator will start at the last item and move backwards
* @return an Iterator over the defined sub-range of the tree
*/
- public static <K, V extends K> BTreeSearchIterator<K, V> slice(Object[] btree, Comparator<? super K> comparator, K start, K end, boolean forwards)
+ public static <K, V extends K> BTreeSearchIterator<K, V> slice(Object[] btree, Comparator<? super K> comparator, K start, K end, Dir dir)
{
- return slice(btree, comparator, start, true, end, false, forwards);
+ return slice(btree, comparator, start, true, end, false, dir);
}
/**
@@ -201,10 +257,10 @@ public class BTree
* @param startInclusive inclusivity of lower bound
* @param end high bound of the range
* @param endInclusive inclusivity of higher bound
- * @param forwards if false, the iterator will start at end and move backwards
- * @return an Iterator over the defined sub-range of the tree
+ * @param dir direction of iteration
+ * @return an Iterator over the defined sub-range of the tree
*/
- public static <K, V extends K> BTreeSearchIterator<K, V> slice(Object[] btree, Comparator<? super K> comparator, K start, boolean startInclusive, K end, boolean endInclusive, boolean forwards)
+ public static <K, V extends K> BTreeSearchIterator<K, V> slice(Object[] btree, Comparator<? super K> comparator, K start, boolean startInclusive, K end, boolean endInclusive, Dir dir)
{
int inclusiveLowerBound = max(0,
start == null ? Integer.MIN_VALUE
@@ -214,14 +270,61 @@ public class BTree
end == null ? Integer.MAX_VALUE
: endInclusive ? floorIndex(btree, comparator, end)
: lowerIndex(btree, comparator, end));
- return new BTreeSearchIterator<>(btree, comparator, forwards, inclusiveLowerBound, inclusiveUpperBound);
+ return new BTreeSearchIterator<>(btree, comparator, dir, inclusiveLowerBound, inclusiveUpperBound);
+ }
+
+ /**
+ * @return the item in the tree that sorts as equal to the search argument, or null if no such item
+ */
+ public static <V> V find(Object[] node, Comparator<? super V> comparator, V find)
+ {
+ while (true)
+ {
+ int keyEnd = getKeyEnd(node);
+ int i = Arrays.binarySearch((V[]) node, 0, keyEnd, find, comparator);
+
+ if (i >= 0)
+ return (V) node[i];
+
+ if (isLeaf(node))
+ return null;
+
+ i = -1 - i;
+ node = (Object[]) node[keyEnd + i];
+ }
+ }
+
+ /**
+ * Modifies the provided btree directly. THIS SHOULD NOT BE USED WITHOUT EXTREME CARE as BTrees are meant to be immutable.
+ * Finds and replaces the provided item in the tree. Both should sort as equal to each other (although this is not enforced)
+ */
+ public static <V> void replaceInSitu(Object[] node, Comparator<? super V> comparator, V find, V replace)
+ {
+ while (true)
+ {
+ int keyEnd = getKeyEnd(node);
+ int i = Arrays.binarySearch((V[]) node, 0, keyEnd, find, comparator);
+
+ if (i >= 0)
+ {
+ assert find == node[i];
+ node[i] = replace;
+ return;
+ }
+
+ if (isLeaf(node))
+ throw new NoSuchElementException();
+
+ i = -1 - i;
+ node = (Object[]) node[keyEnd + i];
+ }
}
/**
* Honours result semantics of {@link Arrays#binarySearch}, as though it were performed on the tree flattened into an array
* @return index of item in tree, or <tt>(-(<i>insertion point</i>) - 1)</tt> if not present
*/
- public static <V> int findIndex(Object[] node, Comparator<V> comparator, V find)
+ public static <V> int findIndex(Object[] node, Comparator<? super V> comparator, V find)
{
int lb = 0;
while (true)
@@ -291,7 +394,6 @@ public class BTree
* (having less height, and operating over only primitive arrays), and the clarity is compelling
*/
-
public static <V> int lowerIndex(Object[] btree, Comparator<? super V> comparator, V find)
{
int i = findIndex(btree, comparator, find);
@@ -425,6 +527,16 @@ public class BTree
return ((int[]) tree[length - 1])[(length / 2) - 1];
}
+ public static long sizeOfStructureOnHeap(Object[] tree)
+ {
+ long size = ObjectSizes.sizeOfArray(tree);
+ if (isLeaf(tree))
+ return size;
+ for (int i = getChildStart(tree) ; i < getChildEnd(tree) ; i++)
+ size += sizeOfStructureOnHeap((Object[]) tree[i]);
+ return size;
+ }
+
// returns true if the provided node is a leaf, false if it is a branch
static boolean isLeaf(Object[] node)
{
@@ -485,6 +597,81 @@ public class BTree
return newTargetOffset - targetOffset;
}
+ // simple class for avoiding duplicate transformation work
+ private static class FiltrationTracker<V> implements Function<V, V>
+ {
+ final Function<? super V, ? extends V> wrapped;
+ int index;
+ boolean failed;
+
+ private FiltrationTracker(Function<? super V, ? extends V> wrapped)
+ {
+ this.wrapped = wrapped;
+ }
+
+ public V apply(V i)
+ {
+ V o = wrapped.apply(i);
+ if (o != null) index++;
+ else failed = true;
+ return o;
+ }
+ }
+
+ /**
+ * Takes a btree and transforms it using the provided function, filtering out any null results.
+ * The result of any transformation must sort identically wrt the other results as their originals
+ */
+ public static <V> Object[] transformAndFilter(Object[] btree, Function<? super V, ? extends V> function)
+ {
+ if (isEmpty(btree))
+ return btree;
+
+ // TODO: can be made more efficient
+ FiltrationTracker<V> wrapped = new FiltrationTracker<>(function);
+ Object[] result = transformAndFilter(btree, wrapped);
+ if (!wrapped.failed)
+ return result;
+
+ // take the already transformed bits from the head of the partial result
+ Iterable<V> head = iterable(result, 0, wrapped.index - 1, Dir.ASC);
+ // and concatenate with remainder of original tree, with transformation applied
+ Iterable<V> remainder = iterable(btree, wrapped.index + 1, size(btree) - 1, Dir.ASC);
+ remainder = filter(transform(remainder, function), (x) -> x != null);
+ Iterable<V> build = concat(head, remainder);
+
+ return buildInternal(build, -1, UpdateFunction.<V>noOp());
+ }
+
+ private static <V> Object[] transformAndFilter(Object[] btree, FiltrationTracker<V> function)
+ {
+ Object[] result = btree;
+ boolean isLeaf = isLeaf(btree);
+ int childOffset = isLeaf ? Integer.MAX_VALUE : getChildStart(btree);
+ int limit = isLeaf ? getLeafKeyEnd(btree) : btree.length - 1;
+ for (int i = 0 ; i < limit ; i++)
+ {
+ // we want to visit in iteration order, so we visit our key nodes inbetween our children
+ int idx = isLeaf ? i : (i / 2) + (i % 2 == 0 ? childOffset : 0);
+ Object current = btree[idx];
+ Object updated = idx < childOffset ? function.apply((V) current) : transformAndFilter((Object[]) current, function);
+ if (updated != current)
+ {
+ if (result == btree)
+ result = btree.clone();
+ result[idx] = updated;
+ }
+ if (function.failed)
+ return result;
+ }
+ return result;
+ }
+
+ public static boolean equals(Object[] a, Object[] b)
+ {
+ return Iterators.elementsEqual(iterator(a), iterator(b));
+ }
+
/**
* tree index => index of key wrt all items in the tree laid out serially
*
@@ -553,28 +740,60 @@ public class BTree
public static class Builder<V>
{
+ public static interface Resolver
+ {
+ // can return a different output type to input, so long as sort order is maintained
+ // if a resolver is present, this method will be called for every sequence of equal inputs
+ // even those with only one item
+ Object resolve(Object[] array, int lb, int ub);
+ }
- final Comparator<? super V> comparator;
+ Comparator<? super V> comparator;
Object[] values = new Object[10];
int count;
- boolean detected; // true if we have managed to cheaply ensure sorted + filtered as we have added
+ boolean detected; // true if we have managed to cheaply ensure sorted (+ filtered, if resolver == null) as we have added
+ boolean auto = true; // false if the user has promised to enforce the sort order and resolve any duplicates
protected Builder(Comparator<? super V> comparator)
{
this.comparator = comparator;
}
+ public void reuse()
+ {
+ reuse(comparator);
+ }
+
+ public void reuse(Comparator<? super V> comparator)
+ {
+ this.comparator = comparator;
+ count = 0;
+ detected = true;
+ }
+
+ public Builder<V> auto(boolean auto)
+ {
+ this.auto = auto;
+ return this;
+ }
+
public Builder<V> add(V v)
{
if (count == values.length)
values = Arrays.copyOf(values, count * 2);
values[count++] = v;
- if (detected && count > 1)
+ if (auto && detected && count > 1)
{
int c = comparator.compare((V) values[count - 2], (V) values[count - 1]);
- if (c == 0) count--;
- else if (c > 0) detected = false;
+ if (c == 0 && auto)
+ {
+ count--;
+ }
+ else if (c > 0)
+ {
+ detected = false;
+ }
}
return this;
@@ -582,9 +801,10 @@ public class BTree
public Builder<V> addAll(Collection<V> add)
{
- if (add instanceof SortedSet && equalComparators(comparator, ((SortedSet) add).comparator()))
+ if (auto && add instanceof SortedSet && equalComparators(comparator, ((SortedSet) add).comparator()))
{
// if we're a SortedSet, permit quick order-preserving addition of items
+ // if we collect all duplicates, don't bother as merge will necessarily be more expensive than sorting at end
return mergeAll(add, add.size());
}
detected = false;
@@ -606,15 +826,15 @@ public class BTree
}
// iter must be in sorted order!
- public Builder<V> mergeAll(Iterable<V> add, int addCount)
+ private Builder<V> mergeAll(Iterable<V> add, int addCount)
{
+ assert auto;
// ensure the existing contents are in order
- sortAndFilter();
+ autoEnforce();
int curCount = count;
// we make room for curCount * 2 + addCount, so that we can copy the current values to the end
// if necessary for continuing the merge, and have the new values directly after the current value range
- // i.e. []
if (values.length < curCount * 2 + addCount)
values = Arrays.copyOf(values, max(curCount * 2 + addCount, curCount * 3));
@@ -633,18 +853,20 @@ public class BTree
return mergeAll(addCount);
}
- // iter must be in sorted order!
private Builder<V> mergeAll(int addCount)
{
- // start optimistically by assuming new values are superset of current, and just run until this fails to hold
Object[] a = values;
int addOffset = count;
int i = 0, j = addOffset;
int curEnd = addOffset, addEnd = addOffset + addCount;
+
+ // save time in cases where we already have a subset, by skipping dir
while (i < curEnd && j < addEnd)
{
- int c = comparator.compare((V) a[i], (V) a[j]);
+ V ai = (V) a[i], aj = (V) a[j];
+ // in some cases, such as Columns, we may have identity supersets, so perform a cheap object-identity check
+ int c = ai == aj ? 0 : comparator.compare(ai, aj);
if (c > 0)
break;
else if (c == 0)
@@ -698,11 +920,18 @@ public class BTree
return count == 0;
}
- private void sortAndFilter()
+ public Builder<V> sort()
+ {
+ Arrays.sort((V[]) values, 0, count, comparator);
+ return this;
+ }
+
+ // automatically enforce sorted+filtered
+ private void autoEnforce()
{
if (!detected && count > 1)
{
- Arrays.sort((V[]) values, 0, count, comparator);
+ sort();
int c = 1;
for (int i = 1 ; i < count ; i++)
if (comparator.compare((V) values[i], (V) values[i - 1]) != 0)
@@ -712,9 +941,30 @@ public class BTree
detected = true;
}
+ public Builder<V> resolve(Resolver resolver)
+ {
+ if (count > 0)
+ {
+ int c = 0;
+ int prev = 0;
+ for (int i = 1 ; i < count ; i++)
+ {
+ if (comparator.compare((V) values[i], (V) values[prev]) != 0)
+ {
+ values[c++] = resolver.resolve((V[]) values, prev, i);
+ prev = i;
+ }
+ }
+ values[c++] = resolver.resolve((V[]) values, prev, count);
+ count = c;
+ }
+ return this;
+ }
+
public Object[] build()
{
- sortAndFilter();
+ if (auto)
+ autoEnforce();
return BTree.build(Arrays.asList(values).subList(0, count), UpdateFunction.noOp());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java b/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java
index d6e6fae..6d023d2 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java
@@ -42,15 +42,15 @@ public class BTreeSearchIterator<K, V> extends TreeCursor<K> implements IndexedS
private static final int LAST = 4; // may co-exist with ON_ITEM, in which case we are also at END
private static final int END = 5; // equal to LAST | ON_ITEM
- public BTreeSearchIterator(Object[] btree, Comparator<? super K> comparator, boolean forwards)
+ public BTreeSearchIterator(Object[] btree, Comparator<? super K> comparator, BTree.Dir dir)
{
- this(btree, comparator, forwards, 0, size(btree)-1);
+ this(btree, comparator, dir, 0, size(btree)-1);
}
- BTreeSearchIterator(Object[] btree, Comparator<? super K> comparator, boolean forwards, int lowerBound, int upperBound)
+ BTreeSearchIterator(Object[] btree, Comparator<? super K> comparator, BTree.Dir dir, int lowerBound, int upperBound)
{
super(comparator, btree);
- this.forwards = forwards;
+ this.forwards = dir == BTree.Dir.ASC;
this.lowerBound = lowerBound;
this.upperBound = upperBound;
rewind();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeSet.java b/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
index 7646693..03fa1ec 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
@@ -24,6 +24,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
+import org.apache.cassandra.utils.btree.BTree.Dir;
+
import static org.apache.cassandra.utils.btree.BTree.findIndex;
import static org.apache.cassandra.utils.btree.BTree.lower;
import static org.apache.cassandra.utils.btree.BTree.toArray;
@@ -50,9 +52,9 @@ public class BTreeSet<V> implements NavigableSet<V>, List<V>
return comparator;
}
- protected BTreeSearchIterator<V, V> slice(boolean forwards)
+ protected BTreeSearchIterator<V, V> slice(Dir dir)
{
- return BTree.slice(tree, comparator, forwards);
+ return BTree.slice(tree, comparator, dir);
}
public Object[] tree()
@@ -101,13 +103,13 @@ public class BTreeSet<V> implements NavigableSet<V>, List<V>
@Override
public BTreeSearchIterator<V, V> iterator()
{
- return slice(true);
+ return slice(Dir.ASC);
}
@Override
public BTreeSearchIterator<V, V> descendingIterator()
{
- return slice(false);
+ return slice(Dir.DESC);
}
@Override
@@ -360,9 +362,9 @@ public class BTreeSet<V> implements NavigableSet<V>, List<V>
}
@Override
- protected BTreeSearchIterator<V, V> slice(boolean forwards)
+ protected BTreeSearchIterator<V, V> slice(Dir dir)
{
- return new BTreeSearchIterator<>(tree, comparator, forwards, lowerBound, upperBound);
+ return new BTreeSearchIterator<>(tree, comparator, dir, lowerBound, upperBound);
}
@Override
@@ -485,9 +487,9 @@ public class BTreeSet<V> implements NavigableSet<V>, List<V>
}
@Override
- protected BTreeSearchIterator<V, V> slice(boolean forwards)
+ protected BTreeSearchIterator<V, V> slice(Dir dir)
{
- return super.slice(!forwards);
+ return super.slice(dir.invert());
}
/* Flip the methods we call for inequality searches */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/utils/btree/NodeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/NodeBuilder.java b/src/java/org/apache/cassandra/utils/btree/NodeBuilder.java
index 83fc661..bba36c3 100644
--- a/src/java/org/apache/cassandra/utils/btree/NodeBuilder.java
+++ b/src/java/org/apache/cassandra/utils/btree/NodeBuilder.java
@@ -263,7 +263,8 @@ final class NodeBuilder
// builds a new root BTree node - must be called on root of operation
Object[] toNode()
{
- assert buildKeyPosition <= FAN_FACTOR && (buildKeyPosition > 0 || getKeyEnd(copyFrom) > 0) : buildKeyPosition;
+ // we permit building empty trees as some constructions do not know in advance how many items they will contain
+ assert buildKeyPosition <= FAN_FACTOR : buildKeyPosition;
return buildFromRange(0, buildKeyPosition, isLeaf(copyFrom), false);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/utils/btree/TreeCursor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/TreeCursor.java b/src/java/org/apache/cassandra/utils/btree/TreeCursor.java
index 9e946f9..8523810 100644
--- a/src/java/org/apache/cassandra/utils/btree/TreeCursor.java
+++ b/src/java/org/apache/cassandra/utils/btree/TreeCursor.java
@@ -124,8 +124,8 @@ class TreeCursor<K> extends NodeCursor<K>
int cmp;
if (key == test) cmp = 0; // check object identity first, since we utilise that in some places and it's very cheap
- else cmp = comparator.compare(key, test);
- if (forwards ? cmp <= 0 : cmp >= 0)
+ else cmp = comparator.compare(test, key); // order of provision matters for asymmetric comparators
+ if (forwards ? cmp >= 0 : cmp <= 0)
{
// we've either matched, or excluded the value from being present
int index = cur.globalIndex();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java b/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
index a470527..8fd470f 100644
--- a/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.Columns;
-import org.apache.cassandra.db.rows.ArrayBackedRow;
+import org.apache.cassandra.db.rows.BTreeBackedRow;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -51,13 +51,13 @@ public abstract class AbstractAllocator
return new CloningArrayBackedRowBuilder(columns, this);
}
- private static class CloningArrayBackedRowBuilder extends ArrayBackedRow.SortedBuilder
+ private static class CloningArrayBackedRowBuilder extends BTreeBackedRow.Builder
{
private final AbstractAllocator allocator;
private CloningArrayBackedRowBuilder(Columns columns, AbstractAllocator allocator)
{
- super(columns);
+ super(columns, true);
this.allocator = allocator;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
----------------------------------------------------------------------
diff --git a/test/burn/org/apache/cassandra/utils/LongBTreeTest.java b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
index 60116b9..37866d2 100644
--- a/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
+++ b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
@@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
-import com.google.common.collect.Iterators;
+import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -43,15 +43,21 @@ import com.codahale.metrics.Timer;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.utils.btree.*;
+import static com.google.common.base.Predicates.notNull;
import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.transform;
import static java.util.Comparator.naturalOrder;
import static java.util.Comparator.reverseOrder;
+import static org.apache.cassandra.utils.btree.BTree.iterable;
+import static org.junit.Assert.assertTrue;
public class LongBTreeTest
{
private static int perThreadTrees = 10000;
- private static final boolean DEBUG = false;
+ private static int minTreeSize = 5;
+ private static int maxTreeSize = 15;
+ private static final boolean DEBUG = true;
private static final MetricRegistry metrics = new MetricRegistry();
private static final Timer BTREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "BTREE"));
private static final Timer TREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "TREE"));
@@ -108,36 +114,36 @@ public class LongBTreeTest
{
final int perTreeSelections = 2;
testRandomSelectionOfSet(perThreadTrees, perTreeSelections,
- (test, canonical) -> {
- if (!canonical.isEmpty() || !test.isEmpty())
- {
- Assert.assertEquals(canonical.isEmpty(), test.isEmpty());
- Assert.assertEquals(canonical.first(), test.first());
- Assert.assertEquals(canonical.last(), test.last());
- }
- return (key) ->
- {
- Assert.assertEquals(test.ceiling(key), canonical.ceiling(key));
- Assert.assertEquals(test.higher(key), canonical.higher(key));
- Assert.assertEquals(test.floor(key), canonical.floor(key));
- Assert.assertEquals(test.lower(key), canonical.lower(key));
- };
- });
+ (test, canonical) -> {
+ if (!canonical.isEmpty() || !test.isEmpty())
+ {
+ Assert.assertEquals(canonical.isEmpty(), test.isEmpty());
+ Assert.assertEquals(canonical.first(), test.first());
+ Assert.assertEquals(canonical.last(), test.last());
+ }
+ return (key) ->
+ {
+ Assert.assertEquals(test.ceiling(key), canonical.ceiling(key));
+ Assert.assertEquals(test.higher(key), canonical.higher(key));
+ Assert.assertEquals(test.floor(key), canonical.floor(key));
+ Assert.assertEquals(test.lower(key), canonical.lower(key));
+ };
+ });
}
@Test
public void testListIndexes() throws InterruptedException
{
testRandomSelectionOfList(perThreadTrees, 4,
- (test, canonical, cmp) ->
- (key) ->
- {
- int javaIndex = Collections.binarySearch(canonical, key, cmp);
- int btreeIndex = test.indexOf(key);
- Assert.assertEquals(javaIndex, btreeIndex);
- if (javaIndex >= 0)
- Assert.assertEquals(canonical.get(javaIndex), test.get(btreeIndex));
- }
+ (test, canonical, cmp) ->
+ (key) ->
+ {
+ int javaIndex = Collections.binarySearch(canonical, key, cmp);
+ int btreeIndex = test.indexOf(key);
+ Assert.assertEquals(javaIndex, btreeIndex);
+ if (javaIndex >= 0)
+ Assert.assertEquals(canonical.get(javaIndex), test.get(btreeIndex));
+ }
);
}
@@ -145,14 +151,81 @@ public class LongBTreeTest
public void testToArray() throws InterruptedException
{
testRandomSelection(perThreadTrees, 4,
- (selection) ->
- {
- Integer[] array = new Integer[selection.canonicalList.size() + 1];
- selection.testAsList.toArray(array, 1);
- Assert.assertEquals(null, array[0]);
- for (int j = 0 ; j < selection.canonicalList.size() ; j++)
- Assert.assertEquals(selection.canonicalList.get(j), array[j + 1]);
- });
+ (selection) ->
+ {
+ Integer[] array = new Integer[selection.canonicalList.size() + 1];
+ selection.testAsList.toArray(array, 1);
+ Assert.assertEquals(null, array[0]);
+ for (int j = 0; j < selection.canonicalList.size(); j++)
+ Assert.assertEquals(selection.canonicalList.get(j), array[j + 1]);
+ });
+ }
+
+ private static final class CountingFunction implements Function<Integer, Integer>
+ {
+ final Function<Integer, Integer> wrapped;
+ int count = 0;
+ protected CountingFunction(Function<Integer, Integer> wrapped)
+ {
+ this.wrapped = wrapped;
+ }
+ public Integer apply(Integer integer)
+ {
+ count++;
+ return wrapped.apply(integer);
+ }
+ }
+
+ @Test
+ public void testTransformAndFilter() throws InterruptedException
+ {
+ testRandomSelection(perThreadTrees, 4, false, false, false,
+ (selection) ->
+ {
+ Map<Integer, Integer> update = new LinkedHashMap<>();
+ for (Integer i : selection.testKeys)
+ update.put(i, new Integer(i));
+
+ CountingFunction function;
+ Object[] original = selection.testAsSet.tree();
+ Object[] transformed;
+
+ // test replacing none, leaving all present
+ function = new CountingFunction((x) -> x);
+ transformed = BTree.transformAndFilter(original, function);
+ Assert.assertEquals(BTree.size(original), function.count);
+ Assert.assertSame(original, transformed);
+
+ // test replacing some, leaving all present
+ function = new CountingFunction((x) -> update.containsKey(x) ? update.get(x) : x);
+ transformed = BTree.transformAndFilter(original, function);
+ Assert.assertEquals(BTree.size(original), function.count);
+ assertSame(transform(selection.canonicalList, function.wrapped), iterable(transformed));
+
+ // test replacing some, removing some
+ function = new CountingFunction(update::get);
+ transformed = BTree.transformAndFilter(original, function);
+ Assert.assertEquals(BTree.size(original), function.count);
+ assertSame(filter(transform(selection.canonicalList, function.wrapped), notNull()), iterable(transformed));
+
+ // test replacing none, removing some
+ function = new CountingFunction((x) -> update.containsKey(x) ? null : x);
+ transformed = BTree.transformAndFilter(selection.testAsList.tree(), function);
+ Assert.assertEquals(BTree.size(original), function.count);
+ assertSame(filter(transform(selection.canonicalList, function.wrapped), notNull()), iterable(transformed));
+ });
+ }
+
+ private static void assertSame(Iterable<Integer> i1, Iterable<Integer> i2)
+ {
+ assertSame(i1.iterator(), i2.iterator());
+ }
+
+ private static void assertSame(Iterator<Integer> i1, Iterator<Integer> i2)
+ {
+ while (i1.hasNext() && i2.hasNext())
+ Assert.assertSame(i1.next(), i2.next());
+ Assert.assertEquals(i1.hasNext(), i2.hasNext());
}
private void testRandomSelectionOfList(int perThreadTrees, int perTreeSelections, BTreeListTestFactory testRun) throws InterruptedException
@@ -195,8 +268,14 @@ public class LongBTreeTest
testEachKey.testOne(key);
});
}
+
private void testRandomSelection(int perThreadTrees, int perTreeSelections, Consumer<RandomSelection> testRun) throws InterruptedException
{
+ testRandomSelection(perThreadTrees, perTreeSelections, true, true, true, testRun);
+ }
+
+ private void testRandomSelection(int perThreadTrees, int perTreeSelections, boolean narrow, boolean mixInNotPresentItems, boolean permitReversal, Consumer<RandomSelection> testRun) throws InterruptedException
+ {
int threads = Runtime.getRuntime().availableProcessors();
final CountDownLatch latch = new CountDownLatch(threads);
final AtomicLong errors = new AtomicLong();
@@ -212,10 +291,10 @@ public class LongBTreeTest
{
for (int i = 0 ; i < perThreadTrees ; i++)
{
- RandomTree tree = randomTree(100, 10000);
+ RandomTree tree = randomTree(minTreeSize, maxTreeSize);
for (int j = 0 ; j < perTreeSelections ; j++)
{
- testRun.accept(tree.select());
+ testRun.accept(tree.select(narrow, mixInNotPresentItems, permitReversal));
count.incrementAndGet();
}
}
@@ -273,7 +352,7 @@ public class LongBTreeTest
this.test = test;
}
- RandomSelection select()
+ RandomSelection select(boolean narrow, boolean mixInNotPresentItems, boolean permitReversal)
{
ThreadLocalRandom random = ThreadLocalRandom.current();
NavigableSet<Integer> canonicalSet = this.canonical;
@@ -285,11 +364,11 @@ public class LongBTreeTest
Assert.assertEquals(canonicalList.size(), testAsList.size());
// sometimes select keys first, so we cover full range
- List<Integer> allKeys = randomKeys(canonical);
+ List<Integer> allKeys = randomKeys(canonical, mixInNotPresentItems);
List<Integer> keys = allKeys;
- int narrow = random.nextInt(3);
- while (canonicalList.size() > 10 && keys.size() > 10 && narrow-- > 0)
+ int narrowCount = random.nextInt(3);
+ while (narrow && canonicalList.size() > 10 && keys.size() > 10 && narrowCount-- > 0)
{
boolean useLb = random.nextBoolean();
boolean useUb = random.nextBoolean();
@@ -297,7 +376,7 @@ public class LongBTreeTest
continue;
// select a range smaller than the total span when we have more narrowing iterations left
- int indexRange = keys.size() / (narrow + 1);
+ int indexRange = keys.size() / (narrowCount + 1);
boolean lbInclusive = true;
Integer lbKey = canonicalList.get(0);
@@ -354,7 +433,7 @@ public class LongBTreeTest
keys = allKeys;
Comparator<Integer> comparator = naturalOrder();
- if (random.nextBoolean())
+ if (permitReversal && random.nextBoolean())
{
if (allKeys != keys)
keys = new ArrayList<>(keys);
@@ -478,10 +557,10 @@ public class LongBTreeTest
// select a random subset of the keys, with an optional random population of keys inbetween those that are present
// return a value with the search position
- private static List<Integer> randomKeys(Iterable<Integer> canonical)
+ private static List<Integer> randomKeys(Iterable<Integer> canonical, boolean mixInNotPresentItems)
{
ThreadLocalRandom rnd = ThreadLocalRandom.current();
- boolean useFake = rnd.nextBoolean();
+ boolean useFake = mixInNotPresentItems && rnd.nextBoolean();
final float fakeRatio = rnd.nextFloat();
List<Integer> results = new ArrayList<>();
Long fakeLb = null, fakeUb = null;
@@ -525,8 +604,8 @@ public class LongBTreeTest
btree = BTree.update(btree, naturalOrder(), canon, UpdateFunction.<Integer>noOp());
canon.add(Integer.MIN_VALUE);
canon.add(Integer.MAX_VALUE);
- Assert.assertTrue(BTree.isWellFormed(btree, naturalOrder()));
- testEqual("Oversize", BTree.<Integer>slice(btree, naturalOrder(), true), canon.iterator());
+ assertTrue(BTree.isWellFormed(btree, naturalOrder()));
+ testEqual("Oversize", BTree.iterator(btree), canon.iterator());
}
@Test
@@ -652,7 +731,7 @@ public class LongBTreeTest
throw new AssertionError("Not well formed!");
}
if (quickEquality)
- testEqual("", BTree.<Integer>slice(btree, naturalOrder(), true), canon.keySet().iterator());
+ testEqual("", BTree.iterator(btree), canon.keySet().iterator());
else
r.addAll(testAllSlices("RND", btree, new TreeSet<>(canon.keySet())));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/test/unit/org/apache/cassandra/db/RowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowTest.java b/test/unit/org/apache/cassandra/db/RowTest.java
index 9a97483..373cf6a 100644
--- a/test/unit/org/apache/cassandra/db/RowTest.java
+++ b/test/unit/org/apache/cassandra/db/RowTest.java
@@ -127,7 +127,7 @@ public class RowTest
ColumnDefinition defA = cfm.getColumnDefinition(new ColumnIdentifier("a", true));
ColumnDefinition defB = cfm.getColumnDefinition(new ColumnIdentifier("b", true));
- Row.Builder builder = ArrayBackedRow.unsortedBuilder(cfm.partitionColumns().regulars, nowInSeconds);
+ Row.Builder builder = BTreeBackedRow.unsortedBuilder(cfm.partitionColumns().regulars, nowInSeconds);
builder.newRow(cfm.comparator.make("c1"));
writeSimpleCellValue(builder, cfm, defA, "a1", 0);
writeSimpleCellValue(builder, cfm, defA, "a2", 1);
@@ -152,7 +152,7 @@ public class RowTest
Cell cell = BufferCell.expiring(def, 0, ttl, nowInSeconds, ((AbstractType) def.cellValueType()).decompose("a1"));
- PartitionUpdate update = PartitionUpdate.singleRowUpdate(cfm, dk, ArrayBackedRow.singleCellRow(cfm.comparator.make("c1"), cell));
+ PartitionUpdate update = PartitionUpdate.singleRowUpdate(cfm, dk, BTreeBackedRow.singleCellRow(cfm.comparator.make("c1"), cell));
new Mutation(update).applyUnsafe();
// when we read with a nowInSeconds before the cell has expired,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java b/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
index e6d8cb0..54bb344 100644
--- a/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
+++ b/test/unit/org/apache/cassandra/db/rows/RowAndDeletionMergeIteratorTest.java
@@ -364,7 +364,7 @@ public class RowAndDeletionMergeIteratorTest
private void addRow(PartitionUpdate update, int col1, int a)
{
- update.add(ArrayBackedRow.singleCellRow(update.metadata().comparator.make(col1), makeCell(cfm, defA, a, 0)));
+ update.add(BTreeBackedRow.singleCellRow(update.metadata().comparator.make(col1), makeCell(cfm, defA, a, 0)));
}
private Cell makeCell(CFMetaData cfm, ColumnDefinition columnDefinition, int value, long timestamp)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
index fd775b4..c16365a 100644
--- a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
+++ b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
@@ -371,7 +371,7 @@ public class UnfilteredRowIteratorsMergeTest
{
final Clustering clustering = clusteringFor(pos);
final LivenessInfo live = LivenessInfo.create(metadata, timeGenerator.apply(pos), nowInSec);
- return ArrayBackedRow.noCellLiveRow(clustering, live);
+ return BTreeBackedRow.noCellLiveRow(clustering, live);
}
private void dumpList(List<Unfiltered> list)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java b/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
index df4b08a..316a23c 100644
--- a/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
+++ b/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java
@@ -268,7 +268,7 @@ public class TriggerExecutorTest
private static PartitionUpdate makeCf(CFMetaData metadata, String key, String columnValue1, String columnValue2)
{
- Row.Builder builder = ArrayBackedRow.unsortedBuilder(metadata.partitionColumns().regulars, FBUtilities.nowInSeconds());
+ Row.Builder builder = BTreeBackedRow.unsortedBuilder(metadata.partitionColumns().regulars, FBUtilities.nowInSeconds());
builder.newRow(Clustering.EMPTY);
long ts = FBUtilities.timestampMicros();
if (columnValue1 != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/test/unit/org/apache/cassandra/utils/BTreeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/BTreeTest.java b/test/unit/org/apache/cassandra/utils/BTreeTest.java
index c004836..ec4c359 100644
--- a/test/unit/org/apache/cassandra/utils/BTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/BTreeTest.java
@@ -20,8 +20,10 @@ package org.apache.cassandra.utils;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
+import com.google.common.collect.Iterables;
import org.junit.Test;
+import junit.framework.Assert;
import org.apache.cassandra.utils.btree.BTree;
import org.apache.cassandra.utils.btree.UpdateFunction;
@@ -191,9 +193,80 @@ public class BTreeTest
assertEquals(1, monitor.getNumberOfCalls(3));
}
+ /**
+ * Tests that the apply method of the <code>UpdateFunction</code> is only called once per value with each build call.
+ */
+ @Test
+ public void testBuilder_Resolver()
+ {
+ // for numbers x in 1..N, we repeat x x times, and resolve values to their sum,
+ // so that the resulting tree is of square numbers
+ BTree.Builder.Resolver resolver = (array, lb, ub) -> {
+ int sum = 0;
+ for (int i = lb ; i < ub ; i++)
+ sum += (Integer) array[i];
+ return sum;
+ };
+
+ for (int count = 0 ; count < 10 ; count ++)
+ {
+ BTree.Builder<Integer> builder;
+ // first check we produce the right output for sorted input
+ List<Integer> sorted = resolverInput(count, false);
+ builder = BTree.builder(Comparator.naturalOrder());
+ builder.auto(false);
+ for (Integer i : sorted)
+ builder.add(i);
+ // for sorted input, check non-resolve path works before checking resolution path
+ Assert.assertTrue(Iterables.elementsEqual(sorted, BTree.iterable(builder.build())));
+ checkResolverOutput(count, builder.resolve(resolver).build());
+ builder = BTree.builder(Comparator.naturalOrder());
+ builder.auto(false);
+ for (int i = 0 ; i < 10 ; i++)
+ {
+ // now do a few runs of randomized inputs
+ for (Integer j : resolverInput(count, true))
+ builder.add(j);
+ checkResolverOutput(count, builder.sort().resolve(resolver).build());
+ builder.reuse();
+ }
+ }
+ }
+
+ private static List<Integer> resolverInput(int count, boolean shuffled)
+ {
+ List<Integer> result = new ArrayList<>();
+ for (int i = 1 ; i <= count ; i++)
+ for (int j = 0 ; j < i ; j++)
+ result.add(i);
+ if (shuffled)
+ {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ for (int i = 0 ; i < result.size() ; i++)
+ {
+ int swapWith = random.nextInt(i, result.size());
+ Integer t = result.get(swapWith);
+ result.set(swapWith, result.get(i));
+ result.set(i, t);
+ }
+ }
+ return result;
+ }
+
+ private static void checkResolverOutput(int count, Object[] btree)
+ {
+ int i = 1;
+ for (Integer current : BTree.<Integer>iterable(btree))
+ {
+ Assert.assertEquals(i * i, current.intValue());
+ i++;
+ }
+ Assert.assertEquals(i, count + 1);
+ }
+
private static void checkResult(int count, Object[] btree)
{
- Iterator<Integer> iter = BTree.slice(btree, CMP, true);
+ Iterator<Integer> iter = BTree.slice(btree, CMP, BTree.Dir.ASC);
int i = 0;
while (iter.hasNext())
assertEquals(iter.next(), ints[i++]);
[2/2] cassandra git commit: Use BTree to back default Row and
ComplexColumnData objects
Posted by be...@apache.org.
Use BTree to back default Row and ComplexColumnData objects
patch by benedict; reviewed by branimir for CASSANDRA-9888
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/639d4b24
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/639d4b24
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/639d4b24
Branch: refs/heads/trunk
Commit: 639d4b240c084900b6589222a0984babfc1890b1
Parents: de420e5
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Jul 23 14:22:16 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Tue Jul 28 15:00:27 2015 +0100
----------------------------------------------------------------------
.../cassandra/config/ColumnDefinition.java | 16 +
.../apache/cassandra/cql3/UpdateParameters.java | 4 +-
.../cassandra/db/HintedHandOffManager.java | 4 +-
.../org/apache/cassandra/db/LegacyLayout.java | 4 +-
.../apache/cassandra/db/RowUpdateBuilder.java | 6 +-
.../cassandra/db/UnfilteredDeserializer.java | 2 +-
.../AbstractSimplePerColumnSecondaryIndex.java | 4 +-
.../db/index/composites/CompositesIndex.java | 2 +-
.../AbstractThreadUnsafePartition.java | 2 +-
.../db/partitions/AtomicBTreePartition.java | 7 +-
.../db/partitions/PartitionUpdate.java | 4 +-
.../apache/cassandra/db/rows/AbstractCell.java | 8 +-
.../cassandra/db/rows/ArrayBackedRow.java | 927 -------------------
.../cassandra/db/rows/BTreeBackedRow.java | 535 +++++++++++
.../apache/cassandra/db/rows/BufferCell.java | 9 +-
src/java/org/apache/cassandra/db/rows/Cell.java | 35 +-
.../apache/cassandra/db/rows/ColumnData.java | 25 +-
.../cassandra/db/rows/ComplexColumnData.java | 208 ++---
src/java/org/apache/cassandra/db/rows/Row.java | 6 +-
src/java/org/apache/cassandra/db/rows/Rows.java | 4 +-
.../rows/UnfilteredRowIteratorSerializer.java | 2 +-
.../db/rows/UnfilteredRowIterators.java | 2 +-
.../cassandra/db/rows/UnfilteredSerializer.java | 2 +-
.../io/sstable/SSTableSimpleIterator.java | 2 +-
.../apache/cassandra/service/DataResolver.java | 2 +-
.../cassandra/thrift/CassandraServer.java | 9 +-
.../cassandra/thrift/ThriftResultsMerger.java | 4 +-
.../org/apache/cassandra/utils/btree/BTree.java | 320 ++++++-
.../utils/btree/BTreeSearchIterator.java | 8 +-
.../apache/cassandra/utils/btree/BTreeSet.java | 18 +-
.../cassandra/utils/btree/NodeBuilder.java | 3 +-
.../cassandra/utils/btree/TreeCursor.java | 4 +-
.../utils/memory/AbstractAllocator.java | 6 +-
.../apache/cassandra/utils/LongBTreeTest.java | 173 +++-
test/unit/org/apache/cassandra/db/RowTest.java | 4 +-
.../rows/RowAndDeletionMergeIteratorTest.java | 2 +-
.../rows/UnfilteredRowIteratorsMergeTest.java | 2 +-
.../cassandra/triggers/TriggerExecutorTest.java | 2 +-
.../org/apache/cassandra/utils/BTreeTest.java | 75 +-
39 files changed, 1200 insertions(+), 1252 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index 54a00f5..8d8a929 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -33,6 +33,8 @@ import org.apache.cassandra.exceptions.*;
public class ColumnDefinition extends ColumnSpecification implements Comparable<ColumnDefinition>
{
+ public static final Comparator<Object> asymmetricColumnDataComparator = (a, b) -> ((ColumnData) a).column().compareTo((ColumnDefinition) b);
+
/*
* The type of CQL3 column this definition represents.
* There is 4 main type of CQL3 columns: those parts of the partition key,
@@ -70,6 +72,8 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable<
private final Integer componentIndex;
private final Comparator<CellPath> cellPathComparator;
+ private final Comparator<Object> asymmetricCellPathComparator;
+ private final Comparator<? super Cell> cellComparator;
/**
* These objects are compared frequently, so we encode several of their comparison components
@@ -156,6 +160,8 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable<
this.componentIndex = componentIndex;
this.setIndexType(indexType, indexOptions);
this.cellPathComparator = makeCellPathComparator(kind, validator);
+ this.cellComparator = cellPathComparator == null ? ColumnData.comparator : (a, b) -> cellPathComparator.compare(a.path(), b.path());
+ this.asymmetricCellPathComparator = cellPathComparator == null ? null : (a, b) -> cellPathComparator.compare(((Cell)a).path(), (CellPath) b);
this.comparisonOrder = comparisonOrder(kind, isComplex(), position());
}
@@ -407,6 +413,16 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable<
return cellPathComparator;
}
+ public Comparator<Object> asymmetricCellPathComparator()
+ {
+ return asymmetricCellPathComparator;
+ }
+
+ public Comparator<? super Cell> cellComparator()
+ {
+ return cellComparator;
+ }
+
public boolean isComplex()
{
return cellPathComparator != null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/cql3/UpdateParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index 8dcb7e5..519eb4b 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -120,13 +120,13 @@ public class UpdateParameters
if (clustering == Clustering.STATIC_CLUSTERING)
{
if (staticBuilder == null)
- staticBuilder = ArrayBackedRow.unsortedBuilder(updatedColumns.statics, nowInSec);
+ staticBuilder = BTreeBackedRow.unsortedBuilder(updatedColumns.statics, nowInSec);
builder = staticBuilder;
}
else
{
if (regularBuilder == null)
- regularBuilder = ArrayBackedRow.unsortedBuilder(updatedColumns.regulars, nowInSec);
+ regularBuilder = BTreeBackedRow.unsortedBuilder(updatedColumns.regulars, nowInSec);
builder = regularBuilder;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 234ab97..6ff880c 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -137,7 +137,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, Mutation.serializer, MessagingService.current_version));
Cell cell = BufferCell.expiring(hintColumn, now, ttl, FBUtilities.nowInSeconds(), value);
- return new Mutation(PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, key, ArrayBackedRow.singleCellRow(clustering, cell)));
+ return new Mutation(PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, key, BTreeBackedRow.singleCellRow(clustering, cell)));
}
/*
@@ -181,7 +181,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
{
DecoratedKey dk = StorageService.getPartitioner().decorateKey(tokenBytes);
Cell cell = BufferCell.tombstone(hintColumn, timestamp, FBUtilities.nowInSeconds());
- PartitionUpdate upd = PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, dk, ArrayBackedRow.singleCellRow(clustering, cell));
+ PartitionUpdate upd = PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, dk, BTreeBackedRow.singleCellRow(clustering, cell));
new Mutation(upd).applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 501dbb2..696c1c9 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -353,7 +353,7 @@ public abstract class LegacyLayout
for (ColumnDefinition column : statics)
columnsToFetch.add(column.name.bytes);
- Row.Builder builder = ArrayBackedRow.unsortedBuilder(statics, FBUtilities.nowInSeconds());
+ Row.Builder builder = BTreeBackedRow.unsortedBuilder(statics, FBUtilities.nowInSeconds());
builder.newRow(Clustering.STATIC_CLUSTERING);
boolean foundOne = false;
@@ -822,7 +822,7 @@ public abstract class LegacyLayout
this.metadata = metadata;
this.isStatic = isStatic;
this.helper = helper;
- this.builder = ArrayBackedRow.sortedBuilder(isStatic ? metadata.partitionColumns().statics : metadata.partitionColumns().regulars);
+ this.builder = BTreeBackedRow.sortedBuilder(isStatic ? metadata.partitionColumns().statics : metadata.partitionColumns().regulars);
}
public static CellGrouper staticGrouper(CFMetaData metadata, SerializationHelper helper)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
index 71b7bd8..c06a7f7 100644
--- a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
+++ b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
@@ -80,7 +80,7 @@ public class RowUpdateBuilder
assert staticBuilder == null : "Cannot update both static and non-static columns with the same RowUpdateBuilder object";
assert regularBuilder == null : "Cannot add the clustering twice to the same row";
- regularBuilder = ArrayBackedRow.unsortedBuilder(update.columns().regulars, FBUtilities.nowInSeconds());
+ regularBuilder = BTreeBackedRow.unsortedBuilder(update.columns().regulars, FBUtilities.nowInSeconds());
regularBuilder.newRow(clustering);
// If a CQL table, add the "row marker"
@@ -105,7 +105,7 @@ public class RowUpdateBuilder
assert regularBuilder == null : "Cannot update both static and non-static columns with the same RowUpdateBuilder object";
if (staticBuilder == null)
{
- staticBuilder = ArrayBackedRow.unsortedBuilder(update.columns().statics, FBUtilities.nowInSeconds());
+ staticBuilder = BTreeBackedRow.unsortedBuilder(update.columns().statics, FBUtilities.nowInSeconds());
staticBuilder.newRow(Clustering.STATIC_CLUSTERING);
}
return staticBuilder;
@@ -186,7 +186,7 @@ public class RowUpdateBuilder
assert clusteringValues.length == update.metadata().comparator.size() || (clusteringValues.length == 0 && !update.columns().statics.isEmpty());
boolean isStatic = clusteringValues.length != update.metadata().comparator.size();
- Row.Builder builder = ArrayBackedRow.sortedBuilder(isStatic ? update.columns().statics : update.columns().regulars);
+ Row.Builder builder = BTreeBackedRow.sortedBuilder(isStatic ? update.columns().statics : update.columns().regulars);
if (isStatic)
builder.newRow(Clustering.STATIC_CLUSTERING);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
index 36a372f..c00597a 100644
--- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
+++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
@@ -121,7 +121,7 @@ public abstract class UnfilteredDeserializer
super(metadata, in, helper);
this.header = header;
this.clusteringDeserializer = new ClusteringPrefix.Deserializer(metadata.comparator, in, header);
- this.builder = ArrayBackedRow.sortedBuilder(helper.fetchedRegularColumns(header));
+ this.builder = BTreeBackedRow.sortedBuilder(helper.fetchedRegularColumns(header));
}
public boolean hasNext() throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index c3a3c08..842cbb9 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -115,7 +115,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
{
DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, cellValue, path));
- Row row = ArrayBackedRow.emptyDeletedRow(makeIndexClustering(rowKey, clustering, path), deletion);
+ Row row = BTreeBackedRow.emptyDeletedRow(makeIndexClustering(rowKey, clustering, path), deletion);
PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null);
@@ -132,7 +132,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
{
DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, cell));
- Row row = ArrayBackedRow.noCellLiveRow(makeIndexClustering(rowKey, clustering, cell), info);
+ Row row = BTreeBackedRow.noCellLiveRow(makeIndexClustering(rowKey, clustering, cell), info);
PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
if (logger.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index e073802..42861c5 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -112,7 +112,7 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
public void delete(IndexedEntry entry, OpOrder.Group opGroup, int nowInSec)
{
- Row row = ArrayBackedRow.emptyDeletedRow(entry.indexClustering, new DeletionTime(entry.timestamp, nowInSec));
+ Row row = BTreeBackedRow.emptyDeletedRow(entry.indexClustering, new DeletionTime(entry.timestamp, nowInSec));
PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, entry.indexValue, row);
indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
index d79ab06..a716768 100644
--- a/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
@@ -167,7 +167,7 @@ public abstract class AbstractThreadUnsafePartition implements Partition, Iterab
activeDeletion = rt.deletionTime();
if (row == null)
- return activeDeletion.isLive() ? null : ArrayBackedRow.emptyDeletedRow(clustering, activeDeletion);
+ return activeDeletion.isLive() ? null : BTreeBackedRow.emptyDeletedRow(clustering, activeDeletion);
return row.filter(columns, activeDeletion, true, metadata);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index c06ffd5..e8ec4c0 100644
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.utils.memory.MemtableAllocator;
import org.apache.cassandra.utils.memory.HeapAllocator;
import org.apache.cassandra.service.StorageService;
import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
+import static org.apache.cassandra.utils.btree.BTree.Dir.desc;
/**
* A thread-safe and atomic Partition implementation.
@@ -164,7 +165,7 @@ public class AtomicBTreePartition implements Partition
final Holder current = ref;
return new SearchIterator<Clustering, Row>()
{
- private final SearchIterator<Clustering, Row> rawIter = new BTreeSearchIterator<>(current.tree, metadata.comparator, !reversed);
+ private final SearchIterator<Clustering, Row> rawIter = new BTreeSearchIterator<>(current.tree, metadata.comparator, desc(reversed));
private final DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion();
public boolean hasNext()
@@ -188,7 +189,7 @@ public class AtomicBTreePartition implements Partition
activeDeletion = rt.deletionTime();
if (row == null)
- return activeDeletion.isLive() ? null : ArrayBackedRow.emptyDeletedRow(clustering, activeDeletion);
+ return activeDeletion.isLive() ? null : BTreeBackedRow.emptyDeletedRow(clustering, activeDeletion);
return row.filter(columns, activeDeletion, true, metadata);
}
@@ -235,7 +236,7 @@ public class AtomicBTreePartition implements Partition
{
Slice.Bound start = slice.start() == Slice.Bound.BOTTOM ? null : slice.start();
Slice.Bound end = slice.end() == Slice.Bound.TOP ? null : slice.end();
- Iterator<Row> rowIter = BTree.slice(current.tree, metadata.comparator, start, true, end, true, !reversed);
+ Iterator<Row> rowIter = BTree.slice(current.tree, metadata.comparator, start, true, end, true, desc(reversed));
return new RowAndDeletionMergeIterator(metadata,
partitionKey,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 689b832..102008f 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -822,8 +822,8 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
{
// This is a bit of a giant hack as this is the only place where we mutate a Row object. This makes it more efficient
// for counters however and this won't be needed post-#6506 so that's probably fine.
- assert row instanceof ArrayBackedRow;
- ((ArrayBackedRow)row).setValue(column, path, value);
+ assert row instanceof BTreeBackedRow;
+ ((BTreeBackedRow)row).setValue(column, path, value);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/AbstractCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
index 807741a..f53322a 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.rows;
import java.security.MessageDigest;
import java.util.Objects;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CollectionType;
@@ -32,8 +33,13 @@ import org.apache.cassandra.utils.FBUtilities;
* Unless you have a very good reason not to, every cell implementation
* should probably extend this class.
*/
-public abstract class AbstractCell implements Cell
+public abstract class AbstractCell extends Cell
{
+ protected AbstractCell(ColumnDefinition column)
+ {
+ super(column);
+ }
+
public void digest(MessageDigest digest)
{
digest.update(value().duplicate());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/ArrayBackedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ArrayBackedRow.java b/src/java/org/apache/cassandra/db/rows/ArrayBackedRow.java
deleted file mode 100644
index 12b23e1..0000000
--- a/src/java/org/apache/cassandra/db/rows/ArrayBackedRow.java
+++ /dev/null
@@ -1,927 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.function.Predicate;
-
-import com.google.common.collect.AbstractIterator;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.utils.SearchIterator;
-import org.apache.cassandra.utils.ObjectSizes;
-
-/**
- * Immutable implementation of a Row object.
- */
-public class ArrayBackedRow extends AbstractRow
-{
- private static final ColumnData[] NO_DATA = new ColumnData[0];
-
- private static final long EMPTY_SIZE = ObjectSizes.measure(new ArrayBackedRow(Clustering.EMPTY, Columns.NONE, LivenessInfo.EMPTY, DeletionTime.LIVE, 0, NO_DATA, Integer.MAX_VALUE));
-
- private final Clustering clustering;
- private final Columns columns;
- private final LivenessInfo primaryKeyLivenessInfo;
- private final DeletionTime deletion;
-
- // The data for each columns present in this row in column sorted order.
- private final int size;
- private final ColumnData[] data;
-
- // We need to filter the tombstones of a row on every read (twice in fact: first to remove purgeable tombstone, and then after reconciliation to remove
- // all tombstone since we don't return them to the client) as well as on compaction. But it's likely that many rows won't have any tombstone at all, so
- // we want to speed up that case by not having to iterate/copy the row in this case. We could keep a single boolean telling us if we have tombstones,
- // but that doesn't work for expiring columns. So instead we keep the deletion time for the first thing in the row to be deleted. This allow at any given
- // time to know if we have any deleted information or not. If we any "true" tombstone (i.e. not an expiring cell), this value will be forced to
- // Integer.MIN_VALUE, but if we don't and have expiring cells, this will the time at which the first expiring cell expires. If we have no tombstones and
- // no expiring cells, this will be Integer.MAX_VALUE;
- private final int minLocalDeletionTime;
-
- private ArrayBackedRow(Clustering clustering, Columns columns, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, int size, ColumnData[] data, int minLocalDeletionTime)
- {
- this.clustering = clustering;
- this.columns = columns;
- this.primaryKeyLivenessInfo = primaryKeyLivenessInfo;
- this.deletion = deletion;
- this.size = size;
- this.data = data;
- this.minLocalDeletionTime = minLocalDeletionTime;
- }
-
- // Note that it's often easier/safer to use the sortedBuilder/unsortedBuilder or one of the static creation method below. Only directly useful in a small amount of cases.
- public static ArrayBackedRow create(Clustering clustering, Columns columns, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, int size, ColumnData[] data)
- {
- int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion));
- if (minDeletionTime != Integer.MIN_VALUE)
- {
- for (int i = 0; i < size; i++)
- minDeletionTime = Math.min(minDeletionTime, minDeletionTime(data[i]));
- }
-
- return new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, size, data, minDeletionTime);
- }
-
- public static ArrayBackedRow emptyRow(Clustering clustering)
- {
- return new ArrayBackedRow(clustering, Columns.NONE, LivenessInfo.EMPTY, DeletionTime.LIVE, 0, NO_DATA, Integer.MAX_VALUE);
- }
-
- public static ArrayBackedRow singleCellRow(Clustering clustering, Cell cell)
- {
- if (cell.column().isSimple())
- return new ArrayBackedRow(clustering, Columns.of(cell.column()), LivenessInfo.EMPTY, DeletionTime.LIVE, 1, new ColumnData[]{ cell }, minDeletionTime(cell));
-
- ComplexColumnData complexData = new ComplexColumnData(cell.column(), new Cell[]{ cell }, DeletionTime.LIVE);
- return new ArrayBackedRow(clustering, Columns.of(cell.column()), LivenessInfo.EMPTY, DeletionTime.LIVE, 1, new ColumnData[]{ complexData }, minDeletionTime(cell));
- }
-
- public static ArrayBackedRow emptyDeletedRow(Clustering clustering, DeletionTime deletion)
- {
- assert !deletion.isLive();
- return new ArrayBackedRow(clustering, Columns.NONE, LivenessInfo.EMPTY, deletion, 0, NO_DATA, Integer.MIN_VALUE);
- }
-
- public static ArrayBackedRow noCellLiveRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo)
- {
- assert !primaryKeyLivenessInfo.isEmpty();
- return new ArrayBackedRow(clustering, Columns.NONE, primaryKeyLivenessInfo, DeletionTime.LIVE, 0, NO_DATA, minDeletionTime(primaryKeyLivenessInfo));
- }
-
- private static int minDeletionTime(Cell cell)
- {
- return cell.isTombstone() ? Integer.MIN_VALUE : cell.localDeletionTime();
- }
-
- private static int minDeletionTime(LivenessInfo info)
- {
- return info.isExpiring() ? info.localExpirationTime() : Integer.MAX_VALUE;
- }
-
- private static int minDeletionTime(DeletionTime dt)
- {
- return dt.isLive() ? Integer.MAX_VALUE : Integer.MIN_VALUE;
- }
-
- private static int minDeletionTime(ComplexColumnData cd)
- {
- int min = minDeletionTime(cd.complexDeletion());
- for (Cell cell : cd)
- min = Math.min(min, minDeletionTime(cell));
- return min;
- }
-
- private static int minDeletionTime(ColumnData cd)
- {
- return cd.column().isSimple() ? minDeletionTime((Cell)cd) : minDeletionTime((ComplexColumnData)cd);
- }
-
- public Clustering clustering()
- {
- return clustering;
- }
-
- public Columns columns()
- {
- return columns;
- }
-
- public LivenessInfo primaryKeyLivenessInfo()
- {
- return primaryKeyLivenessInfo;
- }
-
- public DeletionTime deletion()
- {
- return deletion;
- }
-
- public Cell getCell(ColumnDefinition c)
- {
- assert !c.isComplex();
- int idx = binarySearch(c);
- return idx < 0 ? null : (Cell)data[idx];
- }
-
- public Cell getCell(ColumnDefinition c, CellPath path)
- {
- assert c.isComplex();
- int idx = binarySearch(c);
- if (idx < 0)
- return null;
-
- return ((ComplexColumnData)data[idx]).getCell(path);
- }
-
- public ComplexColumnData getComplexColumnData(ColumnDefinition c)
- {
- assert c.isComplex();
- int idx = binarySearch(c);
- return idx < 0 ? null : (ComplexColumnData)data[idx];
- }
-
- public Iterator<ColumnData> iterator()
- {
- return new ColumnDataIterator();
- }
-
- public Iterable<Cell> cells()
- {
- return CellIterator::new;
- }
-
- public SearchIterator<ColumnDefinition, ColumnData> searchIterator()
- {
- return new ColumnSearchIterator();
- }
-
- public Row filter(ColumnFilter filter, CFMetaData metadata)
- {
- return filter(filter, DeletionTime.LIVE, false, metadata);
- }
-
- public Row filter(ColumnFilter filter, DeletionTime activeDeletion, boolean setActiveDeletionToRow, CFMetaData metadata)
- {
- Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = metadata.getDroppedColumns();
-
- if (filter.includesAllColumns() && (activeDeletion.isLive() || deletion.supersedes(activeDeletion)) && droppedColumns.isEmpty())
- return this;
-
- boolean mayHaveShadowed = activeDeletion.supersedes(deletion);
-
- LivenessInfo newInfo = primaryKeyLivenessInfo;
- DeletionTime newDeletion = deletion;
- if (mayHaveShadowed)
- {
- if (activeDeletion.deletes(newInfo.timestamp()))
- newInfo = LivenessInfo.EMPTY;
- // note that mayHaveShadowed means the activeDeletion shadows the row deletion. So if don't have setActiveDeletionToRow,
- // the row deletion is shadowed and we shouldn't return it.
- newDeletion = setActiveDeletionToRow ? activeDeletion : DeletionTime.LIVE;
- }
-
- ColumnData[] newData = new ColumnData[size];
- int newMinDeletionTime = Math.min(minDeletionTime(newInfo), minDeletionTime(newDeletion));
- Columns columns = filter.fetchedColumns().columns(isStatic());
- Predicate<ColumnDefinition> inclusionTester = columns.inOrderInclusionTester();
- int newSize = 0;
- for (int i = 0; i < size; i++)
- {
- ColumnData cd = data[i];
- ColumnDefinition column = cd.column();
- if (!inclusionTester.test(column))
- continue;
-
- CFMetaData.DroppedColumn dropped = droppedColumns.get(column.name.bytes);
- if (column.isSimple())
- {
- Cell cell = (Cell)cd;
- if ((dropped == null || cell.timestamp() > dropped.droppedTime) && !(mayHaveShadowed && activeDeletion.deletes(cell)))
- {
- newData[newSize++] = cell;
- newMinDeletionTime = Math.min(newMinDeletionTime, minDeletionTime(cell));
- }
- }
- else
- {
- ColumnData newCd = ((ComplexColumnData)cd).filter(filter, mayHaveShadowed ? activeDeletion : DeletionTime.LIVE, dropped);
- if (newCd != null)
- {
- newData[newSize++] = newCd;
- newMinDeletionTime = Math.min(newMinDeletionTime, minDeletionTime(newCd));
- }
- }
- }
-
- if (newSize == 0 && newInfo.isEmpty() && newDeletion.isLive())
- return null;
-
- return new ArrayBackedRow(clustering, columns, newInfo, newDeletion, newSize, newData, newMinDeletionTime);
- }
-
- public boolean hasComplexDeletion()
- {
- // We start by the end cause we know complex columns sort before simple ones
- for (int i = size - 1; i >= 0; i--)
- {
- ColumnData cd = data[i];
- if (cd.column().isSimple())
- return false;
-
- if (!((ComplexColumnData)cd).complexDeletion().isLive())
- return true;
- }
- return false;
- }
-
- public Row markCounterLocalToBeCleared()
- {
- ColumnData[] newData = null;
- for (int i = 0; i < size; i++)
- {
- ColumnData cd = data[i];
- ColumnData newCd = cd.column().cellValueType().isCounter()
- ? cd.markCounterLocalToBeCleared()
- : cd;
- if (newCd != cd)
- {
- if (newData == null)
- newData = Arrays.copyOf(data, size);
- newData[i] = newCd;
- }
- }
-
- return newData == null
- ? this
- : new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, size, newData, minLocalDeletionTime);
- }
-
- public boolean hasDeletion(int nowInSec)
- {
- return nowInSec >= minLocalDeletionTime;
- }
-
- /**
- * Returns a copy of the row where all timestamps for live data have replaced by {@code newTimestamp} and
- * all deletion timestamp by {@code newTimestamp - 1}.
- *
- * This exists for the Paxos path, see {@link PartitionUpdate#updateAllTimestamp} for additional details.
- */
- public Row updateAllTimestamp(long newTimestamp)
- {
- LivenessInfo newInfo = primaryKeyLivenessInfo.isEmpty() ? primaryKeyLivenessInfo : primaryKeyLivenessInfo.withUpdatedTimestamp(newTimestamp);
- DeletionTime newDeletion = deletion.isLive() ? deletion : new DeletionTime(newTimestamp - 1, deletion.localDeletionTime());
-
- ColumnData[] newData = new ColumnData[size];
- for (int i = 0; i < size; i++)
- newData[i] = data[i].updateAllTimestamp(newTimestamp);
-
- return new ArrayBackedRow(clustering, columns, newInfo, newDeletion, size, newData, minLocalDeletionTime);
- }
-
- public Row purge(DeletionPurger purger, int nowInSec)
- {
- if (!hasDeletion(nowInSec))
- return this;
-
- LivenessInfo newInfo = purger.shouldPurge(primaryKeyLivenessInfo, nowInSec) ? LivenessInfo.EMPTY : primaryKeyLivenessInfo;
- DeletionTime newDeletion = purger.shouldPurge(deletion) ? DeletionTime.LIVE : deletion;
-
- int newMinDeletionTime = Math.min(minDeletionTime(newInfo), minDeletionTime(newDeletion));
- ColumnData[] newData = new ColumnData[size];
- int newSize = 0;
- for (int i = 0; i < size; i++)
- {
- ColumnData purged = data[i].purge(purger, nowInSec);
- if (purged != null)
- {
- newData[newSize++] = purged;
- newMinDeletionTime = Math.min(newMinDeletionTime, minDeletionTime(purged));
- }
- }
-
- if (newSize == 0 && newInfo.isEmpty() && newDeletion.isLive())
- return null;
-
- return new ArrayBackedRow(clustering, columns, newInfo, newDeletion, newSize, newData, newMinDeletionTime);
- }
-
- public int dataSize()
- {
- int dataSize = clustering.dataSize()
- + primaryKeyLivenessInfo.dataSize()
- + deletion.dataSize();
-
- for (int i = 0; i < size; i++)
- dataSize += data[i].dataSize();
- return dataSize;
- }
-
- public long unsharedHeapSizeExcludingData()
- {
- long heapSize = EMPTY_SIZE
- + clustering.unsharedHeapSizeExcludingData()
- + ObjectSizes.sizeOfArray(data);
-
- for (int i = 0; i < size; i++)
- heapSize += data[i].unsharedHeapSizeExcludingData();
- return heapSize;
- }
-
- public static Row.Builder sortedBuilder(Columns columns)
- {
- return new SortedBuilder(columns);
- }
-
- public static Row.Builder unsortedBuilder(Columns columns, int nowInSec)
- {
- return new UnsortedBuilder(columns, nowInSec);
- }
-
- // This is only used by PartitionUpdate.CounterMark but other uses should be avoided as much as possible as it breaks our general
- // assumption that Row objects are immutable. This method should go away post-#6506 in particular.
- // This method is in particular not exposed by the Row API on purpose.
- // This method also *assumes* that the cell we're setting already exists.
- public void setValue(ColumnDefinition column, CellPath path, ByteBuffer value)
- {
- int idx = binarySearch(column);
- assert idx >= 0;
- if (column.isSimple())
- data[idx] = ((Cell)data[idx]).withUpdatedValue(value);
- else
- ((ComplexColumnData)data[idx]).setValue(path, value);
- }
-
- private int binarySearch(ColumnDefinition column)
- {
- return binarySearch(column, 0, size);
- }
-
- /**
- * Simple binary search for a given column (in the data list).
- *
- * The return value has the exact same meaning that the one of Collections.binarySearch() but
- * we don't use the later because we're searching for a 'ColumnDefinition' in an array of 'ColumnData'.
- */
- private int binarySearch(ColumnDefinition column, int fromIndex, int toIndex)
- {
- int low = fromIndex;
- int mid = toIndex;
- int high = mid - 1;
- int result = -1;
- while (low <= high)
- {
- mid = (low + high) >> 1;
- if ((result = column.compareTo(data[mid].column())) > 0)
- low = mid + 1;
- else if (result == 0)
- return mid;
- else
- high = mid - 1;
- }
- return -mid - (result < 0 ? 1 : 2);
- }
-
- private class ColumnDataIterator extends AbstractIterator<ColumnData>
- {
- private int i;
-
- protected ColumnData computeNext()
- {
- return i < size ? data[i++] : endOfData();
- }
- }
-
- private class CellIterator extends AbstractIterator<Cell>
- {
- private int i;
- private Iterator<Cell> complexCells;
-
- protected Cell computeNext()
- {
- while (true)
- {
- if (complexCells != null)
- {
- if (complexCells.hasNext())
- return complexCells.next();
-
- complexCells = null;
- }
-
- if (i >= size)
- return endOfData();
-
- ColumnData cd = data[i++];
- if (cd.column().isComplex())
- complexCells = ((ComplexColumnData)cd).iterator();
- else
- return (Cell)cd;
- }
- }
- }
-
- private class ColumnSearchIterator implements SearchIterator<ColumnDefinition, ColumnData>
- {
- // The index at which the next call to "next" should start looking from
- private int searchFrom = 0;
-
- public boolean hasNext()
- {
- return searchFrom < size;
- }
-
- public ColumnData next(ColumnDefinition column)
- {
- int idx = binarySearch(column, searchFrom, size);
- if (idx < 0)
- {
- searchFrom = -idx - 1;
- return null;
- }
- else
- {
- // We've found it. We'll start after it next time.
- searchFrom = idx + 1;
- return data[idx];
- }
- }
- }
-
- private static abstract class AbstractBuilder implements Row.Builder
- {
- protected final Columns columns;
-
- protected Clustering clustering;
- protected LivenessInfo primaryKeyLivenessInfo;
- protected DeletionTime deletion;
-
- protected List<Cell> cells = new ArrayList<>();
-
- // For complex column at index i of 'columns', we store at complexDeletions[i] its complex deletion.
- protected DeletionTime[] complexDeletions;
- protected int columnsWithComplexDeletion;
-
- protected AbstractBuilder(Columns columns)
- {
- this.columns = columns;
- this.complexDeletions = new DeletionTime[columns.complexColumnCount()];
- }
-
- public void newRow(Clustering clustering)
- {
- assert this.clustering == null; // Ensures we've properly called build() if we've use this builder before
- this.clustering = clustering;
- }
-
- public Clustering clustering()
- {
- return clustering;
- }
-
- protected void reset()
- {
- this.clustering = null;
- this.primaryKeyLivenessInfo = LivenessInfo.EMPTY;
- this.deletion = DeletionTime.LIVE;
- this.cells.clear();
- Arrays.fill(this.complexDeletions, null);
- this.columnsWithComplexDeletion = 0;
- }
-
- public void addPrimaryKeyLivenessInfo(LivenessInfo info)
- {
- this.primaryKeyLivenessInfo = info;
- }
-
- public void addRowDeletion(DeletionTime deletion)
- {
- this.deletion = deletion;
- }
-
- public void addCell(Cell cell)
- {
- assert cell.column().isStatic() == (clustering == Clustering.STATIC_CLUSTERING) : "Column is " + cell.column() + ", clustering = " + clustering;
- cells.add(cell);
- }
-
- public Row build()
- {
- Row row = buildInternal();
- reset();
- return row;
- }
-
- protected abstract Row buildInternal();
-
- protected Row buildNoCells()
- {
- assert cells.isEmpty();
- int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion));
- if (columnsWithComplexDeletion == 0)
- return new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, 0, NO_DATA, minDeletionTime);
-
- ColumnData[] data = new ColumnData[columnsWithComplexDeletion];
- int size = 0;
- for (int i = 0; i < complexDeletions.length; i++)
- {
- DeletionTime complexDeletion = complexDeletions[i];
- if (complexDeletion != null)
- {
- assert !complexDeletion.isLive();
- data[size++] = new ComplexColumnData(columns.getComplex(i), ComplexColumnData.NO_CELLS, complexDeletion);
- minDeletionTime = Integer.MIN_VALUE;
- }
- }
- return new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, size, data, minDeletionTime);
- }
- }
-
- public static class SortedBuilder extends AbstractBuilder
- {
- private int columnCount;
-
- private ColumnDefinition column;
-
- // The index of the last column for which we've called setColumn if complex.
- private int complexColumnIndex;
-
- // For complex column at index i of 'columns', we store at complexColumnCellsCount[i] its number of added cells.
- private final int[] complexColumnCellsCount;
-
- protected SortedBuilder(Columns columns)
- {
- super(columns);
- this.complexColumnCellsCount = new int[columns.complexColumnCount()];
- reset();
- }
-
- @Override
- protected void reset()
- {
- super.reset();
- this.column = null;
- this.columnCount = 0;
- this.complexColumnIndex = -1;
- Arrays.fill(this.complexColumnCellsCount, 0);
- }
-
- public boolean isSorted()
- {
- return true;
- }
-
- private void setColumn(ColumnDefinition column)
- {
- int cmp = this.column == null ? -1 : this.column.compareTo(column);
- assert cmp <= 0 : "current = " + this.column + ", new = " + column;
- if (cmp != 0)
- {
- this.column = column;
- ++columnCount;
- if (column.isComplex())
- complexColumnIndex = columns.complexIdx(column, complexColumnIndex + 1);
- }
- }
-
- @Override
- public void addCell(Cell cell)
- {
- setColumn(cell.column());
- super.addCell(cell);
- if (column.isComplex())
- complexColumnCellsCount[complexColumnIndex] += 1;
- }
-
- @Override
- public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion)
- {
- if (complexDeletion.isLive())
- return;
-
- setColumn(column);
- assert complexDeletions[complexColumnIndex] == null;
- complexDeletions[complexColumnIndex] = complexDeletion;
- ++columnsWithComplexDeletion;
- }
-
- protected Row buildInternal()
- {
- if (cells.isEmpty())
- return buildNoCells();
-
- int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion));
-
- ColumnData[] data = new ColumnData[columnCount];
- int complexIdx = 0;
- int i = 0;
- int size = 0;
- while (i < cells.size())
- {
- Cell cell = cells.get(i);
- ColumnDefinition column = cell.column();
- if (column.isSimple())
- {
- data[size++] = cell;
- minDeletionTime = Math.min(minDeletionTime, minDeletionTime(cell));
- ++i;
- }
- else
- {
- while (columns.getComplex(complexIdx).compareTo(column) < 0)
- {
- if (complexDeletions[complexIdx] != null)
- {
- data[size++] = new ComplexColumnData(columns.getComplex(complexIdx), ComplexColumnData.NO_CELLS, complexDeletions[complexIdx]);
- minDeletionTime = Integer.MIN_VALUE;
- }
- ++complexIdx;
- }
-
- DeletionTime complexDeletion = complexDeletions[complexIdx];
- if (complexDeletion != null)
- minDeletionTime = Integer.MIN_VALUE;
- int cellCount = complexColumnCellsCount[complexIdx];
- Cell[] complexCells = new Cell[cellCount];
- for (int j = 0; j < cellCount; j++)
- {
- Cell complexCell = cells.get(i + j);
- complexCells[j] = complexCell;
- minDeletionTime = Math.min(minDeletionTime, minDeletionTime(complexCell));
- }
- i += cellCount;
-
- data[size++] = new ComplexColumnData(column, complexCells, complexDeletion == null ? DeletionTime.LIVE : complexDeletion);
- ++complexIdx;
- }
- }
- for (int j = complexIdx; j < complexDeletions.length; j++)
- {
- if (complexDeletions[j] != null)
- {
- data[size++] = new ComplexColumnData(columns.getComplex(j), ComplexColumnData.NO_CELLS, complexDeletions[j]);
- minDeletionTime = Integer.MIN_VALUE;
- }
- }
- assert size == data.length;
- return new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, size, data, minDeletionTime);
- }
- }
-
- private static class UnsortedBuilder extends AbstractBuilder
- {
- private final int nowInSec;
-
- private UnsortedBuilder(Columns columns, int nowInSec)
- {
- super(columns);
- this.nowInSec = nowInSec;
- reset();
- }
-
- public boolean isSorted()
- {
- return false;
- }
-
- public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion)
- {
- assert column.isComplex();
- assert column.isStatic() == (clustering == Clustering.STATIC_CLUSTERING);
-
- if (complexDeletion.isLive())
- return;
-
- int complexColumnIndex = columns.complexIdx(column, 0);
-
- DeletionTime previous = complexDeletions[complexColumnIndex];
- if (previous == null || complexDeletion.supersedes(previous))
- {
- complexDeletions[complexColumnIndex] = complexDeletion;
- if (previous == null)
- ++columnsWithComplexDeletion;
- }
- }
-
- protected Row buildInternal()
- {
- // First, the easy cases
- if (cells.isEmpty())
- return buildNoCells();
-
- // Cells have been added in an unsorted way, so sort them first
- Collections.sort(cells, Cell.comparator);
-
- // We now need to
- // 1) merge equal cells together
- // 2) group the cells for a given complex column together, and include their potential complex deletion time.
- // And this without forgetting that some complex columns may have a complex deletion but not cells.
-
- int addedColumns = countAddedColumns();
- ColumnData[] data = new ColumnData[addedColumns];
-
- int nextComplexWithDeletion = findNextComplexWithDeletion(0);
- ColumnDefinition previousColumn = null;
-
- int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion));
-
- int i = 0;
- int size = 0;
- while (i < cells.size())
- {
- Cell cell = cells.get(i++);
- ColumnDefinition column = cell.column();
- if (column.isSimple())
- {
- // Either it's a cell for the same column than our previous cell and we merge them together, or it's a new column
- if (previousColumn != null && previousColumn.compareTo(column) == 0)
- data[size - 1] = Cells.reconcile((Cell)data[size - 1], cell, nowInSec);
- else
- data[size++] = cell;
- }
- else
- {
- // First, collect the complex deletion time for the column we got the first complex column of. We'll
- // also find if there is columns that sorts before but had only a complex deletion and add them.
- DeletionTime complexDeletion = DeletionTime.LIVE;
- while (nextComplexWithDeletion >= 0)
- {
- int cmp = column.compareTo(columns.getComplex(nextComplexWithDeletion));
- if (cmp < 0)
- {
- // This is after the column we're gonna add cell for. We'll deal with it later
- break;
- }
- else if (cmp > 0)
- {
- // We have a column that only has a complex deletion and no column. Add its data first
- data[size++] = new ComplexColumnData(columns.getComplex(nextComplexWithDeletion), ComplexColumnData.NO_CELLS, complexDeletions[nextComplexWithDeletion]);
- minDeletionTime = Integer.MIN_VALUE;
- nextComplexWithDeletion = findNextComplexWithDeletion(nextComplexWithDeletion + 1);
- }
- else // cmp == 0
- {
- // This is the column we'll about to add cell for. Record the deletion time and break to the cell addition
- complexDeletion = complexDeletions[nextComplexWithDeletion];
- minDeletionTime = Integer.MIN_VALUE;
- nextComplexWithDeletion = findNextComplexWithDeletion(nextComplexWithDeletion + 1);
- break;
- }
- }
-
- // Find how many cells the complex column has (cellCount) and the index of the next cell that doesn't belong to it (nextColumnIdx).
- int nextColumnIdx = i; // i is on cell following the current one
- int cellCount = 1; // We have at least the current cell
- Cell previousCell = cell;
- while (nextColumnIdx < cells.size())
- {
- Cell newCell = cells.get(nextColumnIdx);
- if (column.compareTo(newCell.column()) != 0)
- break;
-
- ++nextColumnIdx;
- if (column.cellPathComparator().compare(previousCell.path(), newCell.path()) != 0)
- ++cellCount;
- previousCell = newCell;
- }
- Cell[] columnCells = new Cell[cellCount];
- int complexSize = 0;
- columnCells[complexSize++] = cell;
- previousCell = cell;
- for (int j = i; j < nextColumnIdx; j++)
- {
- Cell newCell = cells.get(j);
- // Either it's a cell for the same path than our previous cell and we merge them together, or it's a new path
- if (column.cellPathComparator().compare(previousCell.path(), newCell.path()) == 0)
- columnCells[complexSize - 1] = Cells.reconcile(previousCell, newCell, nowInSec);
- else
- columnCells[complexSize++] = newCell;
- previousCell = newCell;
- }
- i = nextColumnIdx;
-
- data[size++] = new ComplexColumnData(column, columnCells, complexDeletion);
- }
- previousColumn = column;
- }
- // We may still have some complex columns with only a complex deletion
- while (nextComplexWithDeletion >= 0)
- {
- data[size++] = new ComplexColumnData(columns.getComplex(nextComplexWithDeletion), ComplexColumnData.NO_CELLS, complexDeletions[nextComplexWithDeletion]);
- nextComplexWithDeletion = findNextComplexWithDeletion(nextComplexWithDeletion + 1);
- minDeletionTime = Integer.MIN_VALUE;
- }
- assert size == addedColumns;
-
- // Reconciliation made it harder to compute minDeletionTime for cells in the loop above, so just do it now if we need to.
- if (minDeletionTime != Integer.MIN_VALUE)
- {
- for (ColumnData cd : data)
- minDeletionTime = Math.min(minDeletionTime, minDeletionTime(cd));
- }
-
- return new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, size, data, minDeletionTime);
- }
-
- private int findNextComplexWithDeletion(int from)
- {
- for (int i = from; i < complexDeletions.length; i++)
- {
- if (complexDeletions[i] != null)
- return i;
- }
- return -1;
- }
-
- // Should only be called once the cells have been sorted
- private int countAddedColumns()
- {
- int columnCount = 0;
- int nextComplexWithDeletion = findNextComplexWithDeletion(0);
- ColumnDefinition previousColumn = null;
- for (Cell cell : cells)
- {
- if (previousColumn != null && previousColumn.compareTo(cell.column()) == 0)
- continue;
-
- ++columnCount;
- previousColumn = cell.column();
-
- // We know that simple columns sort before the complex ones, so don't bother with the column having complex deletion
- // until we've reached the cells of complex columns.
- if (!previousColumn.isComplex())
- continue;
-
- while (nextComplexWithDeletion >= 0)
- {
- // Check how the column we just counted compared to the next with complex deletion
- int cmp = previousColumn.compareTo(columns.getComplex(nextComplexWithDeletion));
- if (cmp < 0)
- {
- // it's before, we'll handle nextColumnWithComplexDeletion later
- break;
- }
- else if (cmp > 0)
- {
- // it's after. nextColumnWithComplexDeletion has no cell but we should count it
- ++columnCount;
- nextComplexWithDeletion = findNextComplexWithDeletion(nextComplexWithDeletion + 1);
- }
- else // cmp == 0
- {
- // it's the column we just counted. Ignore it and we know we're good with nextComplexWithDeletion for this loop
- nextComplexWithDeletion = findNextComplexWithDeletion(nextComplexWithDeletion + 1);
- break;
- }
- }
- }
- // Anything remaining in complexDeletionColumns are complex columns with no cells but some complex deletion
- while (nextComplexWithDeletion >= 0)
- {
- ++columnCount;
- nextComplexWithDeletion = findNextComplexWithDeletion(nextComplexWithDeletion + 1);
- }
- return columnCount;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java b/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java
new file mode 100644
index 0000000..7e9ceb8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.Predicate;
+
+import com.google.common.base.Function;
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.BTreeSearchIterator;
+import org.apache.cassandra.utils.btree.UpdateFunction;
+
+/**
+ * Immutable implementation of a Row object.
+ */
+public class BTreeBackedRow extends AbstractRow
+{
+ private static final ColumnData[] NO_DATA = new ColumnData[0];
+
+ private static final long EMPTY_SIZE = ObjectSizes.measure(emptyRow(Clustering.EMPTY));
+
+ private final Clustering clustering;
+ private final Columns columns;
+ private final LivenessInfo primaryKeyLivenessInfo;
+ private final DeletionTime deletion;
+
+ // The data for each columns present in this row in column sorted order.
+ private final Object[] btree;
+
+ // We need to filter the tombstones of a row on every read (twice in fact: first to remove purgeable tombstone, and then after reconciliation to remove
+ // all tombstone since we don't return them to the client) as well as on compaction. But it's likely that many rows won't have any tombstone at all, so
+ // we want to speed up that case by not having to iterate/copy the row in this case. We could keep a single boolean telling us if we have tombstones,
+ // but that doesn't work for expiring columns. So instead we keep the deletion time for the first thing in the row to be deleted. This allow at any given
+ // time to know if we have any deleted information or not. If we any "true" tombstone (i.e. not an expiring cell), this value will be forced to
+ // Integer.MIN_VALUE, but if we don't and have expiring cells, this will the time at which the first expiring cell expires. If we have no tombstones and
+ // no expiring cells, this will be Integer.MAX_VALUE;
+ private final int minLocalDeletionTime;
+
+ private BTreeBackedRow(Clustering clustering, Columns columns, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, Object[] btree, int minLocalDeletionTime)
+ {
+ this.clustering = clustering;
+ this.columns = columns;
+ this.primaryKeyLivenessInfo = primaryKeyLivenessInfo;
+ this.deletion = deletion;
+ this.btree = btree;
+ this.minLocalDeletionTime = minLocalDeletionTime;
+ }
+
+ // Note that it's often easier/safer to use the sortedBuilder/unsortedBuilder or one of the static creation method below. Only directly useful in a small amount of cases.
+ public static BTreeBackedRow create(Clustering clustering, Columns columns, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, Object[] btree)
+ {
+ int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion));
+ if (minDeletionTime != Integer.MIN_VALUE)
+ {
+ for (ColumnData cd : BTree.<ColumnData>iterable(btree))
+ minDeletionTime = Math.min(minDeletionTime, minDeletionTime(cd));
+ }
+
+ return new BTreeBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
+ }
+
+ public static BTreeBackedRow emptyRow(Clustering clustering)
+ {
+ return new BTreeBackedRow(clustering, Columns.NONE, LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.empty(), Integer.MAX_VALUE);
+ }
+
+ public static BTreeBackedRow singleCellRow(Clustering clustering, Cell cell)
+ {
+ if (cell.column().isSimple())
+ return new BTreeBackedRow(clustering, Columns.of(cell.column()), LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.singleton(cell), minDeletionTime(cell));
+
+ ComplexColumnData complexData = new ComplexColumnData(cell.column(), new Cell[]{ cell }, DeletionTime.LIVE);
+ return new BTreeBackedRow(clustering, Columns.of(cell.column()), LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.singleton(complexData), minDeletionTime(cell));
+ }
+
+ public static BTreeBackedRow emptyDeletedRow(Clustering clustering, DeletionTime deletion)
+ {
+ assert !deletion.isLive();
+ return new BTreeBackedRow(clustering, Columns.NONE, LivenessInfo.EMPTY, deletion, BTree.empty(), Integer.MIN_VALUE);
+ }
+
+ public static BTreeBackedRow noCellLiveRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo)
+ {
+ assert !primaryKeyLivenessInfo.isEmpty();
+ return new BTreeBackedRow(clustering, Columns.NONE, primaryKeyLivenessInfo, DeletionTime.LIVE, BTree.empty(), minDeletionTime(primaryKeyLivenessInfo));
+ }
+
+ private static int minDeletionTime(Cell cell)
+ {
+ return cell.isTombstone() ? Integer.MIN_VALUE : cell.localDeletionTime();
+ }
+
+ private static int minDeletionTime(LivenessInfo info)
+ {
+ return info.isExpiring() ? info.localExpirationTime() : Integer.MAX_VALUE;
+ }
+
+ private static int minDeletionTime(DeletionTime dt)
+ {
+ return dt.isLive() ? Integer.MAX_VALUE : Integer.MIN_VALUE;
+ }
+
+ private static int minDeletionTime(ComplexColumnData cd)
+ {
+ int min = minDeletionTime(cd.complexDeletion());
+ for (Cell cell : cd)
+ {
+ min = Math.min(min, minDeletionTime(cell));
+ if (min == Integer.MIN_VALUE)
+ break;
+ }
+ return min;
+ }
+
+ private static int minDeletionTime(ColumnData cd)
+ {
+ return cd.column().isSimple() ? minDeletionTime((Cell) cd) : minDeletionTime((ComplexColumnData)cd);
+ }
+
+ private static int minDeletionTime(Object[] btree, LivenessInfo info, DeletionTime rowDeletion)
+ {
+ int min = Math.min(minDeletionTime(info), minDeletionTime(rowDeletion));
+ for (ColumnData cd : BTree.<ColumnData>iterable(btree))
+ {
+ min = Math.min(min, minDeletionTime(cd));
+ if (min == Integer.MIN_VALUE)
+ break;
+ }
+ return min;
+ }
+
+ public Clustering clustering()
+ {
+ return clustering;
+ }
+
+ public Columns columns()
+ {
+ return columns;
+ }
+
+ public LivenessInfo primaryKeyLivenessInfo()
+ {
+ return primaryKeyLivenessInfo;
+ }
+
+ public DeletionTime deletion()
+ {
+ return deletion;
+ }
+
+ public Cell getCell(ColumnDefinition c)
+ {
+ assert !c.isComplex();
+ return (Cell) BTree.<Object>find(btree, ColumnDefinition.asymmetricColumnDataComparator, c);
+ }
+
+ public Cell getCell(ColumnDefinition c, CellPath path)
+ {
+ assert c.isComplex();
+ ComplexColumnData cd = getComplexColumnData(c);
+ if (cd == null)
+ return null;
+ return cd.getCell(path);
+ }
+
+ public ComplexColumnData getComplexColumnData(ColumnDefinition c)
+ {
+ assert c.isComplex();
+ return (ComplexColumnData) BTree.<Object>find(btree, ColumnDefinition.asymmetricColumnDataComparator, c);
+ }
+
+ public Iterator<ColumnData> iterator()
+ {
+ return searchIterator();
+ }
+
+ public Iterable<Cell> cells()
+ {
+ return CellIterator::new;
+ }
+
+ public BTreeSearchIterator<ColumnDefinition, ColumnData> searchIterator()
+ {
+ return BTree.slice(btree, ColumnDefinition.asymmetricColumnDataComparator, BTree.Dir.ASC);
+ }
+
+ public Row filter(ColumnFilter filter, CFMetaData metadata)
+ {
+ return filter(filter, DeletionTime.LIVE, false, metadata);
+ }
+
+ public Row filter(ColumnFilter filter, DeletionTime activeDeletion, boolean setActiveDeletionToRow, CFMetaData metadata)
+ {
+ Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = metadata.getDroppedColumns();
+
+ if (filter.includesAllColumns() && (activeDeletion.isLive() || deletion.supersedes(activeDeletion)) && droppedColumns.isEmpty())
+ return this;
+
+ boolean mayHaveShadowed = activeDeletion.supersedes(deletion);
+
+ LivenessInfo newInfo = primaryKeyLivenessInfo;
+ DeletionTime newDeletion = deletion;
+ if (mayHaveShadowed)
+ {
+ if (activeDeletion.deletes(newInfo.timestamp()))
+ newInfo = LivenessInfo.EMPTY;
+ // note that mayHaveShadowed means the activeDeletion shadows the row deletion. So if don't have setActiveDeletionToRow,
+ // the row deletion is shadowed and we shouldn't return it.
+ newDeletion = setActiveDeletionToRow ? activeDeletion : DeletionTime.LIVE;
+ }
+
+ Columns columns = filter.fetchedColumns().columns(isStatic());
+ Predicate<ColumnDefinition> inclusionTester = columns.inOrderInclusionTester();
+ return transformAndFilter(newInfo, newDeletion, (cd) -> {
+
+ ColumnDefinition column = cd.column();
+ if (!inclusionTester.test(column))
+ return null;
+
+ CFMetaData.DroppedColumn dropped = droppedColumns.get(column.name.bytes);
+ if (column.isComplex())
+ return ((ComplexColumnData)cd).filter(filter, mayHaveShadowed ? activeDeletion : DeletionTime.LIVE, dropped);
+
+ Cell cell = (Cell)cd;
+ return (dropped == null || cell.timestamp() > dropped.droppedTime) && !(mayHaveShadowed && activeDeletion.deletes(cell))
+ ? cell : null;
+ });
+ }
+
+ public boolean hasComplexDeletion()
+ {
+ // We start by the end cause we know complex columns sort before simple ones
+ for (ColumnData cd : BTree.<ColumnData>iterable(btree, BTree.Dir.DESC))
+ {
+ if (cd.column().isSimple())
+ return false;
+
+ if (!((ComplexColumnData)cd).complexDeletion().isLive())
+ return true;
+ }
+ return false;
+ }
+
+ public Row markCounterLocalToBeCleared()
+ {
+ return transformAndFilter(primaryKeyLivenessInfo, deletion, (cd) -> cd.column().cellValueType().isCounter()
+ ? cd.markCounterLocalToBeCleared()
+ : cd);
+ }
+
+ public boolean hasDeletion(int nowInSec)
+ {
+ return nowInSec >= minLocalDeletionTime;
+ }
+
+ /**
+ * Returns a copy of the row where all timestamps for live data have replaced by {@code newTimestamp} and
+ * all deletion timestamp by {@code newTimestamp - 1}.
+ *
+ * This exists for the Paxos path, see {@link PartitionUpdate#updateAllTimestamp} for additional details.
+ */
+ public Row updateAllTimestamp(long newTimestamp)
+ {
+ LivenessInfo newInfo = primaryKeyLivenessInfo.isEmpty() ? primaryKeyLivenessInfo : primaryKeyLivenessInfo.withUpdatedTimestamp(newTimestamp);
+ DeletionTime newDeletion = deletion.isLive() ? deletion : new DeletionTime(newTimestamp - 1, deletion.localDeletionTime());
+
+ return transformAndFilter(newInfo, newDeletion, (cd) -> cd.updateAllTimestamp(newTimestamp));
+ }
+
+ public Row purge(DeletionPurger purger, int nowInSec)
+ {
+ if (!hasDeletion(nowInSec))
+ return this;
+
+ LivenessInfo newInfo = purger.shouldPurge(primaryKeyLivenessInfo, nowInSec) ? LivenessInfo.EMPTY : primaryKeyLivenessInfo;
+ DeletionTime newDeletion = purger.shouldPurge(deletion) ? DeletionTime.LIVE : deletion;
+
+ return transformAndFilter(newInfo, newDeletion, (cd) -> cd.purge(purger, nowInSec));
+ }
+
+ private Row transformAndFilter(LivenessInfo info, DeletionTime deletion, Function<ColumnData, ColumnData> function)
+ {
+ Object[] transformed = BTree.transformAndFilter(btree, function);
+
+ if (btree == transformed && info == this.primaryKeyLivenessInfo && deletion == this.deletion)
+ return this;
+
+ if (info.isEmpty() && deletion.isLive() && BTree.isEmpty(transformed))
+ return null;
+
+ int minDeletionTime = minDeletionTime(transformed, info, deletion);
+ return new BTreeBackedRow(clustering, columns, info, deletion, transformed, minDeletionTime);
+ }
+
+ public int dataSize()
+ {
+ int dataSize = clustering.dataSize()
+ + primaryKeyLivenessInfo.dataSize()
+ + deletion.dataSize();
+
+ for (ColumnData cd : this)
+ dataSize += cd.dataSize();
+ return dataSize;
+ }
+
+ public long unsharedHeapSizeExcludingData()
+ {
+ long heapSize = EMPTY_SIZE
+ + clustering.unsharedHeapSizeExcludingData()
+ + BTree.sizeOfStructureOnHeap(btree);
+
+ for (ColumnData cd : this)
+ heapSize += cd.unsharedHeapSizeExcludingData();
+ return heapSize;
+ }
+
+ public static Row.Builder sortedBuilder(Columns columns)
+ {
+ return new Builder(columns, true);
+ }
+
+ public static Row.Builder unsortedBuilder(Columns columns, int nowInSec)
+ {
+ return new Builder(columns, false, nowInSec);
+ }
+
+ // This is only used by PartitionUpdate.CounterMark but other uses should be avoided as much as possible as it breaks our general
+ // assumption that Row objects are immutable. This method should go away post-#6506 in particular.
+ // This method is in particular not exposed by the Row API on purpose.
+ // This method also *assumes* that the cell we're setting already exists.
+ public void setValue(ColumnDefinition column, CellPath path, ByteBuffer value)
+ {
+ ColumnData current = (ColumnData) BTree.<Object>find(btree, ColumnDefinition.asymmetricColumnDataComparator, column);
+ if (column.isSimple())
+ BTree.replaceInSitu(btree, ColumnData.comparator, current, ((Cell) current).withUpdatedValue(value));
+ else
+ ((ComplexColumnData) current).setValue(path, value);
+ }
+
+ private class CellIterator extends AbstractIterator<Cell>
+ {
+ private Iterator<ColumnData> columnData = iterator();
+ private Iterator<Cell> complexCells;
+
+ protected Cell computeNext()
+ {
+ while (true)
+ {
+ if (complexCells != null)
+ {
+ if (complexCells.hasNext())
+ return complexCells.next();
+
+ complexCells = null;
+ }
+
+ if (!columnData.hasNext())
+ return endOfData();
+
+ ColumnData cd = columnData.next();
+ if (cd.column().isComplex())
+ complexCells = ((ComplexColumnData)cd).iterator();
+ else
+ return (Cell)cd;
+ }
+ }
+ }
+
+ public static class Builder implements Row.Builder
+ {
+ // a simple marker class that will sort to the beginning of a run of complex cells to store the deletion time
+ private static class ComplexColumnDeletion extends BufferCell
+ {
+ public ComplexColumnDeletion(ColumnDefinition column, DeletionTime deletionTime)
+ {
+ super(column, deletionTime.markedForDeleteAt(), 0, deletionTime.localDeletionTime(), ByteBufferUtil.EMPTY_BYTE_BUFFER, CellPath.BOTTOM);
+ }
+ }
+
+ // converts a run of Cell with equal column into a ColumnData
+ private static class CellResolver implements BTree.Builder.Resolver
+ {
+ final int nowInSec;
+ private CellResolver(int nowInSec)
+ {
+ this.nowInSec = nowInSec;
+ }
+
+ public ColumnData resolve(Object[] cells, int lb, int ub)
+ {
+ Cell cell = (Cell) cells[lb];
+ ColumnDefinition column = cell.column;
+ if (cell.column.isSimple())
+ {
+ assert lb + 1 == ub || nowInSec != Integer.MIN_VALUE;
+ while (++lb < ub)
+ cell = Cells.reconcile(cell, (Cell) cells[lb], nowInSec);
+ return cell;
+ }
+
+ // TODO: relax this in the case our outer provider is sorted (want to delay until remaining changes are
+ // bedded in, as less important; galloping makes it pretty cheap anyway)
+ Arrays.sort(cells, lb, ub, (Comparator<Object>) column.cellComparator());
+ cell = (Cell) cells[lb];
+ DeletionTime deletion = DeletionTime.LIVE;
+ if (cell instanceof ComplexColumnDeletion)
+ {
+ // TODO: do we need to be robust to multiple of these being provided?
+ deletion = new DeletionTime(cell.timestamp(), cell.localDeletionTime());
+ lb++;
+ }
+
+ List<Object> buildFrom = Arrays.asList(cells).subList(lb, ub);
+ Object[] btree = BTree.build(buildFrom, UpdateFunction.noOp());
+ return new ComplexColumnData(column, btree, deletion);
+ }
+
+ };
+ protected final Columns columns;
+
+ protected Clustering clustering;
+ protected LivenessInfo primaryKeyLivenessInfo = LivenessInfo.EMPTY;
+ protected DeletionTime deletion = DeletionTime.LIVE;
+
+ private final boolean isSorted;
+ private final BTree.Builder<Cell> cells;
+ private final CellResolver resolver;
+ private boolean hasComplex = false;
+
+ // For complex column at index i of 'columns', we store at complexDeletions[i] its complex deletion.
+
+ protected Builder(Columns columns, boolean isSorted)
+ {
+ this(columns, isSorted, Integer.MIN_VALUE);
+ }
+
+ protected Builder(Columns columns, boolean isSorted, int nowInSecs)
+ {
+ this.columns = columns;
+ this.cells = BTree.builder(ColumnData.comparator);
+ resolver = new CellResolver(nowInSecs);
+ this.isSorted = isSorted;
+ this.cells.auto(false);
+ }
+
+ public boolean isSorted()
+ {
+ return isSorted;
+ }
+
+ public void newRow(Clustering clustering)
+ {
+ assert this.clustering == null; // Ensures we've properly called build() if we've use this builder before
+ this.clustering = clustering;
+ }
+
+ public Clustering clustering()
+ {
+ return clustering;
+ }
+
+ protected void reset()
+ {
+ this.clustering = null;
+ this.primaryKeyLivenessInfo = LivenessInfo.EMPTY;
+ this.deletion = DeletionTime.LIVE;
+ this.cells.reuse();
+ }
+
+ public void addPrimaryKeyLivenessInfo(LivenessInfo info)
+ {
+ this.primaryKeyLivenessInfo = info;
+ }
+
+ public void addRowDeletion(DeletionTime deletion)
+ {
+ this.deletion = deletion;
+ }
+
+ public void addCell(Cell cell)
+ {
+ assert cell.column().isStatic() == (clustering == Clustering.STATIC_CLUSTERING) : "Column is " + cell.column() + ", clustering = " + clustering;
+ cells.add(cell);
+ hasComplex |= cell.column.isComplex();
+ }
+
+ public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion)
+ {
+ cells.add(new ComplexColumnDeletion(column, complexDeletion));
+ hasComplex = true;
+ }
+
+ public Row build()
+ {
+ if (!isSorted)
+ cells.sort();
+ // we can avoid resolving if we're sorted and have no complex values
+ // (because we'll only have unique simple cells, which are already in their final condition)
+ if (!isSorted | hasComplex)
+ cells.resolve(resolver);
+ Object[] btree = cells.build();
+ int minDeletionTime = minDeletionTime(btree, primaryKeyLivenessInfo, deletion);
+ Row row = new BTreeBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
+ reset();
+ return row;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/BufferCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java b/src/java/org/apache/cassandra/db/rows/BufferCell.java
index e952748..81c42d4 100644
--- a/src/java/org/apache/cassandra/db/rows/BufferCell.java
+++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java
@@ -36,8 +36,6 @@ public class BufferCell extends AbstractCell
{
private static final long EMPTY_SIZE = ObjectSizes.measure(new BufferCell(ColumnDefinition.regularDef("", "", "", ByteType.instance), 0L, 0, 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, null));
- private final ColumnDefinition column;
-
private final long timestamp;
private final int ttl;
private final int localDeletionTime;
@@ -47,8 +45,8 @@ public class BufferCell extends AbstractCell
public BufferCell(ColumnDefinition column, long timestamp, int ttl, int localDeletionTime, ByteBuffer value, CellPath path)
{
+ super(column);
assert column.isComplex() == (path != null);
- this.column = column;
this.timestamp = timestamp;
this.ttl = ttl;
this.localDeletionTime = localDeletionTime;
@@ -90,11 +88,6 @@ public class BufferCell extends AbstractCell
return new BufferCell(column, timestamp, NO_TTL, nowInSec, ByteBufferUtil.EMPTY_BYTE_BUFFER, path);
}
- public ColumnDefinition column()
- {
- return column;
- }
-
public boolean isCounterCell()
{
return !isTombstone() && column.cellValueType().isCounter();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/639d4b24/src/java/org/apache/cassandra/db/rows/Cell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java b/src/java/org/apache/cassandra/db/rows/Cell.java
index ccb9708..1820de2 100644
--- a/src/java/org/apache/cassandra/db/rows/Cell.java
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@ -36,7 +36,7 @@ import org.apache.cassandra.utils.memory.AbstractAllocator;
* 2) expiring cells: on top of regular cells, those have a ttl and a local deletion time (when they are expired).
* 3) tombstone cells: those won't have value, but they have a local deletion time (when the tombstone was created).
*/
-public interface Cell extends ColumnData
+public abstract class Cell extends ColumnData
{
public static final int NO_TTL = 0;
public static final int NO_DELETION_TIME = Integer.MAX_VALUE;
@@ -51,35 +51,40 @@ public interface Cell extends ColumnData
return pathComparator == null ? 0 : pathComparator.compare(c1.path(), c2.path());
};
- public final Serializer serializer = new BufferCell.Serializer();
+ public static final Serializer serializer = new BufferCell.Serializer();
+
+ protected Cell(ColumnDefinition column)
+ {
+ super(column);
+ }
/**
* Whether the cell is a counter cell or not.
*
* @return whether the cell is a counter cell or not.
*/
- public boolean isCounterCell();
+ public abstract boolean isCounterCell();
/**
* The cell value.
*
* @return the cell value.
*/
- public ByteBuffer value();
+ public abstract ByteBuffer value();
/**
* The cell timestamp.
* <p>
* @return the cell timestamp.
*/
- public long timestamp();
+ public abstract long timestamp();
/**
* The cell ttl.
*
* @return the cell ttl, or {@code NO_TTL} if the cell isn't an expiring one.
*/
- public int ttl();
+ public abstract int ttl();
/**
* The cell local deletion time.
@@ -87,14 +92,14 @@ public interface Cell extends ColumnData
* @return the cell local deletion time, or {@code NO_DELETION_TIME} if the cell is neither
* a tombstone nor an expiring one.
*/
- public int localDeletionTime();
+ public abstract int localDeletionTime();
/**
* Whether the cell is a tombstone or not.
*
* @return whether the cell is a tombstone or not.
*/
- public boolean isTombstone();
+ public abstract boolean isTombstone();
/**
* Whether the cell is an expiring one or not.
@@ -105,7 +110,7 @@ public interface Cell extends ColumnData
*
* @return whether the cell is an expiring one or not.
*/
- public boolean isExpiring();
+ public abstract boolean isExpiring();
/**
* Whether the cell is live or not given the current time.
@@ -114,7 +119,7 @@ public interface Cell extends ColumnData
* decide if an expiring cell is expired or live.
* @return whether the cell is live or not at {@code nowInSec}.
*/
- public boolean isLive(int nowInSec);
+ public abstract boolean isLive(int nowInSec);
/**
* For cells belonging to complex types (non-frozen collection and UDT), the
@@ -122,19 +127,19 @@ public interface Cell extends ColumnData
*
* @return the cell path for cells of complex column, and {@code null} for other cells.
*/
- public CellPath path();
+ public abstract CellPath path();
- public Cell withUpdatedValue(ByteBuffer newValue);
+ public abstract Cell withUpdatedValue(ByteBuffer newValue);
- public Cell copy(AbstractAllocator allocator);
+ public abstract Cell copy(AbstractAllocator allocator);
@Override
// Overrides super type to provide a more precise return type.
- public Cell markCounterLocalToBeCleared();
+ public abstract Cell markCounterLocalToBeCleared();
@Override
// Overrides super type to provide a more precise return type.
- public Cell purge(DeletionPurger purger, int nowInSec);
+ public abstract Cell purge(DeletionPurger purger, int nowInSec);
public interface Serializer
{