You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:47 UTC
[23/51] [partial] cassandra git commit: Storage engine refactor,
a.k.a CASSANDRA-8099
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
new file mode 100644
index 0000000..4edd707
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -0,0 +1,819 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.BTreeSearchIterator;
+import org.apache.cassandra.utils.btree.UpdateFunction;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.SearchIterator;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.concurrent.Locks;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
+import org.apache.cassandra.utils.memory.HeapAllocator;
+import org.apache.cassandra.service.StorageService;
+import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
+
+/**
+ * A thread-safe and atomic Partition implementation.
+ *
+ * Operations (in particular addAll) on this implementation are atomic and
+ * isolated (in the sense of ACID). Typically a addAll is guaranteed that no
+ * other thread can see the state where only parts but not all rows have
+ * been added.
+ */
+public class AtomicBTreePartition implements Partition
+{
+ private static final Logger logger = LoggerFactory.getLogger(AtomicBTreePartition.class);
+
+ public static final long EMPTY_SIZE = ObjectSizes.measure(new AtomicBTreePartition(CFMetaData.createFake("keyspace", "table"),
+ StorageService.getPartitioner().decorateKey(ByteBuffer.allocate(1)),
+ null));
+
+ // Reserved values for wasteTracker field. These values must not be consecutive (see avoidReservedValues)
+ private static final int TRACKER_NEVER_WASTED = 0;
+ private static final int TRACKER_PESSIMISTIC_LOCKING = Integer.MAX_VALUE;
+
+ // The granularity with which we track wasted allocation/work; we round up
+ private static final int ALLOCATION_GRANULARITY_BYTES = 1024;
+ // The number of bytes we have to waste in excess of our acceptable realtime rate of waste (defined below)
+ private static final long EXCESS_WASTE_BYTES = 10 * 1024 * 1024L;
+ private static final int EXCESS_WASTE_OFFSET = (int) (EXCESS_WASTE_BYTES / ALLOCATION_GRANULARITY_BYTES);
+ // Note this is a shift, because dividing a long time and then picking the low 32 bits doesn't give correct rollover behavior
+ private static final int CLOCK_SHIFT = 17;
+ // CLOCK_GRANULARITY = 1^9ns >> CLOCK_SHIFT == 132us == (1/7.63)ms
+
+ /**
+ * (clock + allocation) granularity are combined to give us an acceptable (waste) allocation rate that is defined by
+ * the passage of real time of ALLOCATION_GRANULARITY_BYTES/CLOCK_GRANULARITY, or in this case 7.63Kb/ms, or 7.45Mb/s
+ *
+ * in wasteTracker we maintain within EXCESS_WASTE_OFFSET before the current time; whenever we waste bytes
+ * we increment the current value if it is within this window, and set it to the min of the window plus our waste
+ * otherwise.
+ */
+ private volatile int wasteTracker = TRACKER_NEVER_WASTED;
+
+ private static final AtomicIntegerFieldUpdater<AtomicBTreePartition> wasteTrackerUpdater = AtomicIntegerFieldUpdater.newUpdater(AtomicBTreePartition.class, "wasteTracker");
+
+ private static final DeletionInfo LIVE = DeletionInfo.live();
+ // This is a small optimization: DeletionInfo is mutable, but we know that we will always copy it in that class,
+ // so we can safely alias one DeletionInfo.live() reference and avoid some allocations.
+ private static final Holder EMPTY = new Holder(BTree.empty(), LIVE, null, RowStats.NO_STATS);
+
+ private final CFMetaData metadata;
+ private final DecoratedKey partitionKey;
+ private final MemtableAllocator allocator;
+
+ private volatile Holder ref;
+
+ private static final AtomicReferenceFieldUpdater<AtomicBTreePartition, Holder> refUpdater = AtomicReferenceFieldUpdater.newUpdater(AtomicBTreePartition.class, Holder.class, "ref");
+
+ public AtomicBTreePartition(CFMetaData metadata, DecoratedKey partitionKey, MemtableAllocator allocator)
+ {
+ this.metadata = metadata;
+ this.partitionKey = partitionKey;
+ this.allocator = allocator;
+ this.ref = EMPTY;
+ }
+
+ public boolean isEmpty()
+ {
+ return ref.deletionInfo.isLive() && BTree.isEmpty(ref.tree) && ref.staticRow == null;
+ }
+
+ public CFMetaData metadata()
+ {
+ return metadata;
+ }
+
+ public DecoratedKey partitionKey()
+ {
+ return partitionKey;
+ }
+
+ public DeletionTime partitionLevelDeletion()
+ {
+ return ref.deletionInfo.getPartitionDeletion();
+ }
+
+ public PartitionColumns columns()
+ {
+ // We don't really know which columns will be part of the update, so assume it's all of them
+ return metadata.partitionColumns();
+ }
+
+ public boolean hasRows()
+ {
+ return !BTree.isEmpty(ref.tree);
+ }
+
+ public RowStats stats()
+ {
+ return ref.stats;
+ }
+
+ public Row getRow(Clustering clustering)
+ {
+ Row row = searchIterator(ColumnFilter.selection(columns()), false).next(clustering);
+ // Note that for statics, this will never return null, this will return an empty row. However,
+ // it's more consistent for this method to return null if we don't really have a static row.
+ return row == null || (clustering == Clustering.STATIC_CLUSTERING && row.isEmpty()) ? null : row;
+ }
+
+ public SearchIterator<Clustering, Row> searchIterator(final ColumnFilter columns, final boolean reversed)
+ {
+ // TODO: we could optimize comparison for "NativeRow" à la #6755
+ final Holder current = ref;
+ return new SearchIterator<Clustering, Row>()
+ {
+ private final SearchIterator<Clustering, MemtableRowData> rawIter = new BTreeSearchIterator<>(current.tree, metadata.comparator, !reversed);
+ private final MemtableRowData.ReusableRow row = allocator.newReusableRow();
+ private final ReusableFilteringRow filter = new ReusableFilteringRow(columns.fetchedColumns().regulars, columns);
+ private final long partitionDeletion = current.deletionInfo.getPartitionDeletion().markedForDeleteAt();
+
+ public boolean hasNext()
+ {
+ return rawIter.hasNext();
+ }
+
+ public Row next(Clustering key)
+ {
+ if (key == Clustering.STATIC_CLUSTERING)
+ return makeStatic(columns, current, allocator);
+
+ MemtableRowData data = rawIter.next(key);
+ // We also need to find if there is a range tombstone covering this key
+ RangeTombstone rt = current.deletionInfo.rangeCovering(key);
+
+ if (data == null)
+ {
+ // If we have a range tombstone but not data, "fake" the RT by return a row deletion
+ // corresponding to the tombstone.
+ if (rt != null && rt.deletionTime().markedForDeleteAt() > partitionDeletion)
+ return filter.setRowDeletion(rt.deletionTime()).setTo(emptyDeletedRow(key, rt.deletionTime()));
+ return null;
+ }
+
+ row.setTo(data);
+
+ filter.setRowDeletion(null);
+ if (rt == null || rt.deletionTime().markedForDeleteAt() < partitionDeletion)
+ {
+ filter.setDeletionTimestamp(partitionDeletion);
+ }
+ else
+ {
+ filter.setDeletionTimestamp(rt.deletionTime().markedForDeleteAt());
+ // If we have a range tombstone covering that row and it's bigger than the row deletion itself, then
+ // we replace the row deletion by the tombstone deletion as a way to return the tombstone.
+ if (rt.deletionTime().supersedes(row.deletion()))
+ filter.setRowDeletion(rt.deletionTime());
+ }
+
+ return filter.setTo(row);
+ }
+ };
+ }
+
+ private static Row emptyDeletedRow(Clustering clustering, DeletionTime deletion)
+ {
+ return new AbstractRow()
+ {
+ public Columns columns()
+ {
+ return Columns.NONE;
+ }
+
+ public LivenessInfo primaryKeyLivenessInfo()
+ {
+ return LivenessInfo.NONE;
+ }
+
+ public DeletionTime deletion()
+ {
+ return deletion;
+ }
+
+ public boolean isEmpty()
+ {
+ return true;
+ }
+
+ public boolean hasComplexDeletion()
+ {
+ return false;
+ }
+
+ public Clustering clustering()
+ {
+ return clustering;
+ }
+
+ public Cell getCell(ColumnDefinition c)
+ {
+ return null;
+ }
+
+ public Cell getCell(ColumnDefinition c, CellPath path)
+ {
+ return null;
+ }
+
+ public Iterator<Cell> getCells(ColumnDefinition c)
+ {
+ return null;
+ }
+
+ public DeletionTime getDeletion(ColumnDefinition c)
+ {
+ return DeletionTime.LIVE;
+ }
+
+ public Iterator<Cell> iterator()
+ {
+ return Iterators.<Cell>emptyIterator();
+ }
+
+ public SearchIterator<ColumnDefinition, ColumnData> searchIterator()
+ {
+ return new SearchIterator<ColumnDefinition, ColumnData>()
+ {
+ public boolean hasNext()
+ {
+ return false;
+ }
+
+ public ColumnData next(ColumnDefinition column)
+ {
+ return null;
+ }
+ };
+ }
+
+ public Row takeAlias()
+ {
+ return this;
+ }
+ };
+ }
+
+ public UnfilteredRowIterator unfilteredIterator()
+ {
+ return unfilteredIterator(ColumnFilter.selection(columns()), Slices.ALL, false);
+ }
+
+ public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed)
+ {
+ if (slices.size() == 0)
+ {
+ Holder current = ref;
+ DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion();
+ if (selection.fetchedColumns().statics.isEmpty() && partitionDeletion.isLive())
+ return UnfilteredRowIterators.emptyIterator(metadata, partitionKey, reversed);
+
+ return new AbstractUnfilteredRowIterator(metadata,
+ partitionKey,
+ partitionDeletion,
+ selection.fetchedColumns(),
+ makeStatic(selection, current, allocator),
+ reversed,
+ current.stats)
+ {
+ protected Unfiltered computeNext()
+ {
+ return endOfData();
+ }
+ };
+ }
+
+ return slices.size() == 1
+ ? new SingleSliceIterator(metadata, partitionKey, ref, selection, slices.get(0), reversed, allocator)
+ : new SlicesIterator(metadata, partitionKey, ref, selection, slices, reversed, allocator);
+ }
+
+ private static Row makeStatic(ColumnFilter selection, Holder holder, MemtableAllocator allocator)
+ {
+ Columns statics = selection.fetchedColumns().statics;
+ if (statics.isEmpty() || holder.staticRow == null)
+ return Rows.EMPTY_STATIC_ROW;
+
+ return new ReusableFilteringRow(statics, selection)
+ .setDeletionTimestamp(holder.deletionInfo.getPartitionDeletion().markedForDeleteAt())
+ .setTo(allocator.newReusableRow().setTo(holder.staticRow));
+ }
+
+ private static class ReusableFilteringRow extends FilteringRow
+ {
+ private final Columns columns;
+ private final ColumnFilter selection;
+ private ColumnFilter.Tester tester;
+ private long deletionTimestamp;
+
+ // Used by searchIterator in case the row is covered by a tombstone.
+ private DeletionTime rowDeletion;
+
+ public ReusableFilteringRow(Columns columns, ColumnFilter selection)
+ {
+ this.columns = columns;
+ this.selection = selection;
+ }
+
+ public ReusableFilteringRow setDeletionTimestamp(long timestamp)
+ {
+ this.deletionTimestamp = timestamp;
+ return this;
+ }
+
+ public ReusableFilteringRow setRowDeletion(DeletionTime rowDeletion)
+ {
+ this.rowDeletion = rowDeletion;
+ return this;
+ }
+
+ @Override
+ public DeletionTime deletion()
+ {
+ return rowDeletion == null ? super.deletion() : rowDeletion;
+ }
+
+ @Override
+ protected boolean include(LivenessInfo info)
+ {
+ return info.timestamp() > deletionTimestamp;
+ }
+
+ @Override
+ protected boolean include(ColumnDefinition def)
+ {
+ return columns.contains(def);
+ }
+
+ @Override
+ protected boolean include(DeletionTime dt)
+ {
+ return dt.markedForDeleteAt() > deletionTimestamp;
+ }
+
+ @Override
+ protected boolean include(ColumnDefinition c, DeletionTime dt)
+ {
+ return dt.markedForDeleteAt() > deletionTimestamp;
+ }
+
+ @Override
+ protected boolean include(Cell cell)
+ {
+ return selection.includes(cell);
+ }
+ }
+
+ private static class SingleSliceIterator extends AbstractUnfilteredRowIterator
+ {
+ private final Iterator<Unfiltered> iterator;
+ private final ReusableFilteringRow row;
+
+ private SingleSliceIterator(CFMetaData metadata,
+ DecoratedKey key,
+ Holder holder,
+ ColumnFilter selection,
+ Slice slice,
+ boolean isReversed,
+ MemtableAllocator allocator)
+ {
+ super(metadata,
+ key,
+ holder.deletionInfo.getPartitionDeletion(),
+ selection.fetchedColumns(),
+ makeStatic(selection, holder, allocator),
+ isReversed,
+ holder.stats);
+
+ Iterator<Row> rowIter = rowIter(metadata,
+ holder,
+ slice,
+ !isReversed,
+ allocator);
+
+ this.iterator = new RowAndTombstoneMergeIterator(metadata.comparator, isReversed)
+ .setTo(rowIter, holder.deletionInfo.rangeIterator(slice, isReversed));
+
+ this.row = new ReusableFilteringRow(selection.fetchedColumns().regulars, selection)
+ .setDeletionTimestamp(partitionLevelDeletion.markedForDeleteAt());
+ }
+
+ private Iterator<Row> rowIter(CFMetaData metadata,
+ Holder holder,
+ Slice slice,
+ boolean forwards,
+ final MemtableAllocator allocator)
+ {
+ Slice.Bound start = slice.start() == Slice.Bound.BOTTOM ? null : slice.start();
+ Slice.Bound end = slice.end() == Slice.Bound.TOP ? null : slice.end();
+ final Iterator<MemtableRowData> dataIter = BTree.slice(holder.tree, metadata.comparator, start, true, end, true, forwards);
+ return new AbstractIterator<Row>()
+ {
+ private final MemtableRowData.ReusableRow row = allocator.newReusableRow();
+
+ protected Row computeNext()
+ {
+ return dataIter.hasNext() ? row.setTo(dataIter.next()) : endOfData();
+ }
+ };
+ }
+
+ protected Unfiltered computeNext()
+ {
+ while (iterator.hasNext())
+ {
+ Unfiltered next = iterator.next();
+ if (next.kind() == Unfiltered.Kind.ROW)
+ {
+ row.setTo((Row)next);
+ if (!row.isEmpty())
+ return row;
+ }
+ else
+ {
+ RangeTombstoneMarker marker = (RangeTombstoneMarker)next;
+
+ long deletion = partitionLevelDeletion().markedForDeleteAt();
+ if (marker.isOpen(isReverseOrder()))
+ deletion = Math.max(deletion, marker.openDeletionTime(isReverseOrder()).markedForDeleteAt());
+ row.setDeletionTimestamp(deletion);
+ return marker;
+ }
+ }
+ return endOfData();
+ }
+ }
+
+ public static class SlicesIterator extends AbstractUnfilteredRowIterator
+ {
+ private final Holder holder;
+ private final MemtableAllocator allocator;
+ private final ColumnFilter selection;
+ private final Slices slices;
+
+ private int idx;
+ private UnfilteredRowIterator currentSlice;
+
+ private SlicesIterator(CFMetaData metadata,
+ DecoratedKey key,
+ Holder holder,
+ ColumnFilter selection,
+ Slices slices,
+ boolean isReversed,
+ MemtableAllocator allocator)
+ {
+ super(metadata, key, holder.deletionInfo.getPartitionDeletion(), selection.fetchedColumns(), makeStatic(selection, holder, allocator), isReversed, holder.stats);
+ this.holder = holder;
+ this.selection = selection;
+ this.allocator = allocator;
+ this.slices = slices;
+ }
+
+ protected Unfiltered computeNext()
+ {
+ while (true)
+ {
+ if (currentSlice == null)
+ {
+ if (idx >= slices.size())
+ return endOfData();
+
+ int sliceIdx = isReverseOrder ? slices.size() - idx - 1 : idx;
+ currentSlice = new SingleSliceIterator(metadata,
+ partitionKey,
+ holder,
+ selection,
+ slices.get(sliceIdx),
+ isReverseOrder,
+ allocator);
+ idx++;
+ }
+
+ if (currentSlice.hasNext())
+ return currentSlice.next();
+
+ currentSlice = null;
+ }
+ }
+ }
+
+ /**
+ * Adds a given update to this in-memtable partition.
+ *
+ * @return an array containing first the difference in size seen after merging the updates, and second the minimum
+ * time detla between updates.
+ */
+ public long[] addAllWithSizeDelta(final PartitionUpdate update, OpOrder.Group writeOp, Updater indexer)
+ {
+ RowUpdater updater = new RowUpdater(this, allocator, writeOp, indexer);
+ DeletionInfo inputDeletionInfoCopy = null;
+
+ boolean monitorOwned = false;
+ try
+ {
+ if (usePessimisticLocking())
+ {
+ Locks.monitorEnterUnsafe(this);
+ monitorOwned = true;
+ }
+ while (true)
+ {
+ Holder current = ref;
+ updater.ref = current;
+ updater.reset();
+
+ DeletionInfo deletionInfo;
+ if (update.deletionInfo().mayModify(current.deletionInfo))
+ {
+ if (inputDeletionInfoCopy == null)
+ inputDeletionInfoCopy = update.deletionInfo().copy(HeapAllocator.instance);
+
+ deletionInfo = current.deletionInfo.copy().add(inputDeletionInfoCopy);
+ updater.allocated(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize());
+ }
+ else
+ {
+ deletionInfo = current.deletionInfo;
+ }
+
+ Row newStatic = update.staticRow();
+ MemtableRowData staticRow = newStatic == Rows.EMPTY_STATIC_ROW
+ ? current.staticRow
+ : (current.staticRow == null ? updater.apply(newStatic) : updater.apply(current.staticRow, newStatic));
+ Object[] tree = BTree.<Clusterable, Row, MemtableRowData>update(current.tree, update.metadata().comparator, update, update.rowCount(), updater);
+ RowStats newStats = current.stats.mergeWith(update.stats());
+
+ if (tree != null && refUpdater.compareAndSet(this, current, new Holder(tree, deletionInfo, staticRow, newStats)))
+ {
+ indexer.updateRowLevelIndexes();
+ updater.finish();
+ return new long[]{ updater.dataSize, updater.colUpdateTimeDelta };
+ }
+ else if (!monitorOwned)
+ {
+ boolean shouldLock = usePessimisticLocking();
+ if (!shouldLock)
+ {
+ shouldLock = updateWastedAllocationTracker(updater.heapSize);
+ }
+ if (shouldLock)
+ {
+ Locks.monitorEnterUnsafe(this);
+ monitorOwned = true;
+ }
+ }
+ }
+ }
+ finally
+ {
+ if (monitorOwned)
+ Locks.monitorExitUnsafe(this);
+ }
+
+ }
+
+ public boolean usePessimisticLocking()
+ {
+ return wasteTracker == TRACKER_PESSIMISTIC_LOCKING;
+ }
+
+ /**
+ * Update the wasted allocation tracker state based on newly wasted allocation information
+ *
+ * @param wastedBytes the number of bytes wasted by this thread
+ * @return true if the caller should now proceed with pessimistic locking because the waste limit has been reached
+ */
+ private boolean updateWastedAllocationTracker(long wastedBytes)
+ {
+ // Early check for huge allocation that exceeds the limit
+ if (wastedBytes < EXCESS_WASTE_BYTES)
+ {
+ // We round up to ensure work < granularity are still accounted for
+ int wastedAllocation = ((int) (wastedBytes + ALLOCATION_GRANULARITY_BYTES - 1)) / ALLOCATION_GRANULARITY_BYTES;
+
+ int oldTrackerValue;
+ while (TRACKER_PESSIMISTIC_LOCKING != (oldTrackerValue = wasteTracker))
+ {
+ // Note this time value has an arbitrary offset, but is a constant rate 32 bit counter (that may wrap)
+ int time = (int) (System.nanoTime() >>> CLOCK_SHIFT);
+ int delta = oldTrackerValue - time;
+ if (oldTrackerValue == TRACKER_NEVER_WASTED || delta >= 0 || delta < -EXCESS_WASTE_OFFSET)
+ delta = -EXCESS_WASTE_OFFSET;
+ delta += wastedAllocation;
+ if (delta >= 0)
+ break;
+ if (wasteTrackerUpdater.compareAndSet(this, oldTrackerValue, avoidReservedValues(time + delta)))
+ return false;
+ }
+ }
+ // We have definitely reached our waste limit so set the state if it isn't already
+ wasteTrackerUpdater.set(this, TRACKER_PESSIMISTIC_LOCKING);
+ // And tell the caller to proceed with pessimistic locking
+ return true;
+ }
+
+ private static int avoidReservedValues(int wasteTracker)
+ {
+ if (wasteTracker == TRACKER_NEVER_WASTED || wasteTracker == TRACKER_PESSIMISTIC_LOCKING)
+ return wasteTracker + 1;
+ return wasteTracker;
+ }
+
+ private static final class Holder
+ {
+ final DeletionInfo deletionInfo;
+ // the btree of rows
+ final Object[] tree;
+ final MemtableRowData staticRow;
+ final RowStats stats;
+
+ Holder(Object[] tree, DeletionInfo deletionInfo, MemtableRowData staticRow, RowStats stats)
+ {
+ this.tree = tree;
+ this.deletionInfo = deletionInfo;
+ this.staticRow = staticRow;
+ this.stats = stats;
+ }
+
+ Holder with(DeletionInfo info)
+ {
+ return new Holder(this.tree, info, this.staticRow, this.stats);
+ }
+ }
+
+ // the function we provide to the btree utilities to perform any column replacements
+ private static final class RowUpdater implements UpdateFunction<Row, MemtableRowData>
+ {
+ final AtomicBTreePartition updating;
+ final MemtableAllocator allocator;
+ final OpOrder.Group writeOp;
+ final Updater indexer;
+ final int nowInSec;
+ Holder ref;
+ long dataSize;
+ long heapSize;
+ long colUpdateTimeDelta = Long.MAX_VALUE;
+ final MemtableRowData.ReusableRow row;
+ final MemtableAllocator.DataReclaimer reclaimer;
+ final MemtableAllocator.RowAllocator rowAllocator;
+ List<MemtableRowData> inserted; // TODO: replace with walk of aborted BTree
+
+ private RowUpdater(AtomicBTreePartition updating, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
+ {
+ this.updating = updating;
+ this.allocator = allocator;
+ this.writeOp = writeOp;
+ this.indexer = indexer;
+ this.nowInSec = FBUtilities.nowInSeconds();
+ this.row = allocator.newReusableRow();
+ this.reclaimer = allocator.reclaimer();
+ this.rowAllocator = allocator.newRowAllocator(updating.metadata(), writeOp);
+ }
+
+ public MemtableRowData apply(Row insert)
+ {
+ rowAllocator.allocateNewRow(insert.clustering().size(), insert.columns(), insert.isStatic());
+ insert.copyTo(rowAllocator);
+ MemtableRowData data = rowAllocator.allocatedRowData();
+
+ insertIntoIndexes(insert);
+
+ this.dataSize += data.dataSize();
+ this.heapSize += data.unsharedHeapSizeExcludingData();
+ if (inserted == null)
+ inserted = new ArrayList<>();
+ inserted.add(data);
+ return data;
+ }
+
+ public MemtableRowData apply(MemtableRowData existing, Row update)
+ {
+ Columns mergedColumns = existing.columns().mergeTo(update.columns());
+ rowAllocator.allocateNewRow(update.clustering().size(), mergedColumns, update.isStatic());
+
+ colUpdateTimeDelta = Math.min(colUpdateTimeDelta, Rows.merge(row.setTo(existing), update, mergedColumns, rowAllocator, nowInSec, indexer));
+
+ MemtableRowData reconciled = rowAllocator.allocatedRowData();
+
+ dataSize += reconciled.dataSize() - existing.dataSize();
+ heapSize += reconciled.unsharedHeapSizeExcludingData() - existing.unsharedHeapSizeExcludingData();
+ if (inserted == null)
+ inserted = new ArrayList<>();
+ inserted.add(reconciled);
+ discard(existing);
+
+ return reconciled;
+ }
+
+ private void insertIntoIndexes(Row toInsert)
+ {
+ if (indexer == SecondaryIndexManager.nullUpdater)
+ return;
+
+ maybeIndexPrimaryKeyColumns(toInsert);
+ Clustering clustering = toInsert.clustering();
+ for (Cell cell : toInsert)
+ indexer.insert(clustering, cell);
+ }
+
+ private void maybeIndexPrimaryKeyColumns(Row row)
+ {
+ // We want to update a primary key index with the most up to date info contains in that inserted row (if only for
+ // backward compatibility). Note that if there is an index but not a partition key or clustering column one, we've
+ // wasting this work. We might be able to avoid that if row indexing was pushed in the index updater.
+ long timestamp = row.primaryKeyLivenessInfo().timestamp();
+ int ttl = row.primaryKeyLivenessInfo().ttl();
+
+ for (Cell cell : row)
+ {
+ long cellTimestamp = cell.livenessInfo().timestamp();
+ if (cell.isLive(nowInSec))
+ {
+ if (cellTimestamp > timestamp)
+ {
+ timestamp = cellTimestamp;
+ ttl = cell.livenessInfo().ttl();
+ }
+ }
+ }
+
+ indexer.maybeIndex(row.clustering(), timestamp, ttl, row.deletion());
+ }
+
+ protected void reset()
+ {
+ this.dataSize = 0;
+ this.heapSize = 0;
+ if (inserted != null)
+ {
+ for (MemtableRowData row : inserted)
+ abort(row);
+ inserted.clear();
+ }
+ reclaimer.cancel();
+ }
+
+ protected void abort(MemtableRowData abort)
+ {
+ reclaimer.reclaimImmediately(abort);
+ }
+
+ protected void discard(MemtableRowData discard)
+ {
+ reclaimer.reclaim(discard);
+ }
+
+ public boolean abortEarly()
+ {
+ return updating.ref != ref;
+ }
+
+ public void allocated(long heapSize)
+ {
+ this.heapSize += heapSize;
+ }
+
+ protected void finish()
+ {
+ allocator.onHeap().allocate(heapSize, writeOp);
+ reclaimer.commit();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/CachedPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/CachedPartition.java b/src/java/org/apache/cassandra/db/partitions/CachedPartition.java
new file mode 100644
index 0000000..dc482f3
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/CachedPartition.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.cache.IRowCacheEntry;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.io.ISerializer;
+
+/**
+ * A partition stored in the partition cache.
+ *
+ * Note that in practice, the only implementation of this is {@link ArrayBackedPartition},
+ * we keep this interface mainly 1) to make it clear what we need from partition in the cache
+ * (that we don't otherwise) and 2) because {@code ArrayBackedPartition} is used for other
+ * purpose (than caching) and hence using {@code CachedPartition} when we talk about caching is
+ * clearer.
+ */
+public interface CachedPartition extends Partition, IRowCacheEntry
+{
+ public static final ISerializer<CachedPartition> cacheSerializer = new ArrayBackedCachedPartition.Serializer();
+
+ /**
+ * The number of {@code Row} objects in this cached partition.
+ *
+ * Please note that this is <b>not</b> the number of <em>live</em> rows since
+ * some of the row may only contains deleted (or expired) information.
+ *
+ * @return the number of row in the partition.
+ */
+ public int rowCount();
+
+ /**
+ * The number of rows that were live at the time the partition was cached.
+ *
+ * See {@link ColumnFamilyStore#isFilterFullyCoveredBy} to see why we need this.
+ *
+ * @return the number of rows in this partition that were live at the time the
+ * partition was cached (this can be different from the number of live rows now
+ * due to expiring cells).
+ */
+ public int cachedLiveRows();
+
+ /**
+ * The number of rows in this cached partition that have at least one non-expiring
+ * non-deleted cell.
+ *
+ * Note that this is generally not a very meaningful number, but this is used by
+ * {@link DataLimits#hasEnoughLiveData} as an optimization.
+ *
+ * @return the number of row that have at least one non-expiring non-deleted cell.
+ */
+ public int rowsWithNonExpiringCells();
+
+ /**
+ * The last row in this cached partition (in order words, the row with the
+ * biggest clustering that the partition contains).
+ *
+ * @return the last row of the partition, or {@code null} if the partition is empty.
+ */
+ public Row lastRow();
+
+ /**
+ * The number of {@code cell} objects that are not tombstone in this cached partition.
+ *
+ * Please note that this is <b>not</b> the number of <em>live</em> cells since
+ * some of the cells might be expired.
+ *
+ * @return the number of non tombstone cells in the partition.
+ */
+ public int nonTombstoneCellCount();
+
+ /**
+ * The number of cells in this cached partition that are neither tombstone nor expiring.
+ *
+ * Note that this is generally not a very meaningful number, but this is used by
+ * {@link DataLimits#hasEnoughLiveData} as an optimization.
+ *
+ * @return the number of cells that are neither tombstones nor expiring.
+ */
+ public int nonExpiringLiveCells();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/CountingPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/CountingPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/CountingPartitionIterator.java
new file mode 100644
index 0000000..16445e7
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/CountingPartitionIterator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.DataLimits;
+
+public class CountingPartitionIterator extends WrappingPartitionIterator
+{
+ protected final DataLimits.Counter counter;
+
+ public CountingPartitionIterator(PartitionIterator result, DataLimits.Counter counter)
+ {
+ super(result);
+ this.counter = counter;
+ }
+
+ public CountingPartitionIterator(PartitionIterator result, DataLimits limits, int nowInSec)
+ {
+ this(result, limits.newCounter(nowInSec, true));
+ }
+
+ public DataLimits.Counter counter()
+ {
+ return counter;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ if (counter.isDone())
+ return false;
+
+ return super.hasNext();
+ }
+
+ @Override
+ @SuppressWarnings("resource") // Close through the closing of the returned 'CountingRowIterator' (and CountingRowIterator shouldn't throw)
+ public RowIterator next()
+ {
+ return new CountingRowIterator(super.next(), counter);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/CountingRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/CountingRowIterator.java b/src/java/org/apache/cassandra/db/partitions/CountingRowIterator.java
new file mode 100644
index 0000000..4ad321e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/CountingRowIterator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.DataLimits;
+
+public class CountingRowIterator extends WrappingRowIterator
+{
+ protected final DataLimits.Counter counter;
+
+ public CountingRowIterator(RowIterator iter, DataLimits.Counter counter)
+ {
+ super(iter);
+ this.counter = counter;
+
+ counter.newPartition(iter.partitionKey(), iter.staticRow());
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ if (counter.isDoneForPartition())
+ return false;
+
+ return super.hasNext();
+ }
+
+ @Override
+ public Row next()
+ {
+ Row row = super.next();
+ counter.newRow(row);
+ return row;
+ }
+
+ @Override
+ public void close()
+ {
+ super.close();
+ counter.endOfPartition();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredPartitionIterator.java
new file mode 100644
index 0000000..52eedd4
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredPartitionIterator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.DataLimits;
+
+public class CountingUnfilteredPartitionIterator extends WrappingUnfilteredPartitionIterator
+{
+ protected final DataLimits.Counter counter;
+
+ public CountingUnfilteredPartitionIterator(UnfilteredPartitionIterator result, DataLimits.Counter counter)
+ {
+ super(result);
+ this.counter = counter;
+ }
+
+ public DataLimits.Counter counter()
+ {
+ return counter;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ if (counter.isDone())
+ return false;
+
+ return super.hasNext();
+ }
+
+ @Override
+ public UnfilteredRowIterator computeNext(UnfilteredRowIterator iter)
+ {
+ return new CountingUnfilteredRowIterator(iter, counter);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java
new file mode 100644
index 0000000..acaef5d
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.DataLimits;
+
+public class CountingUnfilteredRowIterator extends WrappingUnfilteredRowIterator
+{
+ private final DataLimits.Counter counter;
+
+ public CountingUnfilteredRowIterator(UnfilteredRowIterator iter, DataLimits.Counter counter)
+ {
+ super(iter);
+ this.counter = counter;
+
+ counter.newPartition(iter.partitionKey(), iter.staticRow());
+ }
+
+ public DataLimits.Counter counter()
+ {
+ return counter;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ if (counter.isDoneForPartition())
+ return false;
+
+ return super.hasNext();
+ }
+
+ @Override
+ public Unfiltered next()
+ {
+ Unfiltered unfiltered = super.next();
+ if (unfiltered.kind() == Unfiltered.Kind.ROW)
+ counter.newRow((Row) unfiltered);
+ return unfiltered;
+ }
+
+ @Override
+ public void close()
+ {
+ super.close();
+ counter.endOfPartition();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
new file mode 100644
index 0000000..813654d
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.util.Iterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+
+public class FilteredPartition extends AbstractPartitionData implements Iterable<Row>
+{
+ private FilteredPartition(CFMetaData metadata,
+ DecoratedKey partitionKey,
+ PartitionColumns columns,
+ int initialRowCapacity,
+ boolean sortable)
+ {
+ super(metadata, partitionKey, DeletionTime.LIVE, columns, initialRowCapacity, sortable);
+ }
+
+ /**
+ * Create a FilteredPartition holding all the rows of the provided iterator.
+ *
+ * Warning: Note that this method does not close the provided iterator and it is
+ * up to the caller to do so.
+ */
+ public static FilteredPartition create(RowIterator iterator)
+ {
+ FilteredPartition partition = new FilteredPartition(iterator.metadata(),
+ iterator.partitionKey(),
+ iterator.columns(),
+ 4,
+ iterator.isReverseOrder());
+
+ partition.staticRow = iterator.staticRow().takeAlias();
+
+ Writer writer = partition.new Writer(true);
+
+ while (iterator.hasNext())
+ iterator.next().copyTo(writer);
+
+ // A Partition (or more precisely AbstractPartitionData) always assumes that its data is in clustering
+ // order. So if we've just added them in reverse clustering order, reverse them.
+ if (iterator.isReverseOrder())
+ partition.reverse();
+
+ return partition;
+ }
+
+ public RowIterator rowIterator()
+ {
+ final Iterator<Row> iter = iterator();
+ return new RowIterator()
+ {
+ public CFMetaData metadata()
+ {
+ return metadata;
+ }
+
+ public boolean isReverseOrder()
+ {
+ return false;
+ }
+
+ public PartitionColumns columns()
+ {
+ return columns;
+ }
+
+ public DecoratedKey partitionKey()
+ {
+ return key;
+ }
+
+ public Row staticRow()
+ {
+ return staticRow == null ? Rows.EMPTY_STATIC_ROW : staticRow;
+ }
+
+ public boolean hasNext()
+ {
+ return iter.hasNext();
+ }
+
+ public Row next()
+ {
+ return iter.next();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close()
+ {
+ }
+ };
+ }
+
+ @Override
+ public String toString()
+ {
+ try (RowIterator iterator = rowIterator())
+ {
+ StringBuilder sb = new StringBuilder();
+ CFMetaData metadata = iterator.metadata();
+ PartitionColumns columns = iterator.columns();
+
+ sb.append(String.format("[%s.%s] key=%s columns=%s reversed=%b",
+ metadata.ksName,
+ metadata.cfName,
+ metadata.getKeyValidator().getString(iterator.partitionKey().getKey()),
+ columns,
+ iterator.isReverseOrder()));
+
+ if (iterator.staticRow() != Rows.EMPTY_STATIC_ROW)
+ sb.append("\n ").append(iterator.staticRow().toString(metadata));
+
+ while (iterator.hasNext())
+ sb.append("\n ").append(iterator.next().toString(metadata));
+
+ return sb.toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/FilteringPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/FilteringPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/FilteringPartitionIterator.java
new file mode 100644
index 0000000..c40109b
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/FilteringPartitionIterator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+
+/**
+ * Abstract class to make it easier to write iterators that filter some
+ * parts of another iterator (used for purging tombstones and removing dropped columns).
+ */
+public abstract class FilteringPartitionIterator extends WrappingUnfilteredPartitionIterator
+{
+ private UnfilteredRowIterator next;
+
+ protected FilteringPartitionIterator(UnfilteredPartitionIterator iter)
+ {
+ super(iter);
+ }
+
+ // The filter to use for filtering row contents. Is null by default to mean no particular filtering
+ // but can be overriden by subclasses. Please see FilteringAtomIterator for details on how this is used.
+ protected FilteringRow makeRowFilter()
+ {
+ return null;
+ }
+
+ // Whether or not we should bother filtering the provided rows iterator. This
+ // exists mainly for preformance
+ protected boolean shouldFilter(UnfilteredRowIterator iterator)
+ {
+ return true;
+ }
+
+ protected boolean includeRangeTombstoneMarker(RangeTombstoneMarker marker)
+ {
+ return true;
+ }
+
+ protected boolean includePartitionDeletion(DeletionTime dt)
+ {
+ return true;
+ }
+
+ // Allows to modify the range tombstone returned. This is called *after* includeRangeTombstoneMarker has been called.
+ protected RangeTombstoneMarker filterRangeTombstoneMarker(RangeTombstoneMarker marker, boolean reversed)
+ {
+ return marker;
+ }
+
+ // Called when a particular partition is skipped due to being empty post filtering
+ protected void onEmpty(DecoratedKey key)
+ {
+ }
+
+ public boolean hasNext()
+ {
+ while (next == null && super.hasNext())
+ {
+ UnfilteredRowIterator iterator = super.next();
+ if (shouldFilter(iterator))
+ {
+ next = new FilteringIterator(iterator);
+ if (!isForThrift() && next.isEmpty())
+ {
+ onEmpty(iterator.partitionKey());
+ iterator.close();
+ next = null;
+ }
+ }
+ else
+ {
+ next = iterator;
+ }
+ }
+ return next != null;
+ }
+
+ public UnfilteredRowIterator next()
+ {
+ UnfilteredRowIterator toReturn = next;
+ next = null;
+ return toReturn;
+ }
+
+ @Override
+ public void close()
+ {
+ try
+ {
+ super.close();
+ }
+ finally
+ {
+ if (next != null)
+ next.close();
+ }
+ }
+
+ private class FilteringIterator extends FilteringRowIterator
+ {
+ private FilteringIterator(UnfilteredRowIterator iterator)
+ {
+ super(iterator);
+ }
+
+ @Override
+ protected FilteringRow makeRowFilter()
+ {
+ return FilteringPartitionIterator.this.makeRowFilter();
+ }
+
+ @Override
+ protected boolean includeRangeTombstoneMarker(RangeTombstoneMarker marker)
+ {
+ return FilteringPartitionIterator.this.includeRangeTombstoneMarker(marker);
+ }
+
+ @Override
+ protected RangeTombstoneMarker filterRangeTombstoneMarker(RangeTombstoneMarker marker, boolean reversed)
+ {
+ return FilteringPartitionIterator.this.filterRangeTombstoneMarker(marker, reversed);
+ }
+
+ @Override
+ protected boolean includePartitionDeletion(DeletionTime dt)
+ {
+ return FilteringPartitionIterator.this.includePartitionDeletion(dt);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/Partition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/Partition.java b/src/java/org/apache/cassandra/db/partitions/Partition.java
new file mode 100644
index 0000000..71d0411
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/Partition.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.Slices;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.utils.SearchIterator;
+
+/**
+ * In-memory representation of a Partition.
+ *
+ * Note that most of the storage engine works through iterators (UnfilteredPartitionIterator) to
+ * avoid "materializing" a full partition/query response in memory as much as possible,
+ * and so Partition objects should be use as sparingly as possible. There is a couple
+ * of cases where we do need to represent partition in-memory (memtables and row cache).
+ */
+public interface Partition
+{
+ public CFMetaData metadata();
+ public DecoratedKey partitionKey();
+ public DeletionTime partitionLevelDeletion();
+
+ public PartitionColumns columns();
+
+ public RowStats stats();
+
+ /**
+ * Whether the partition object has no informations at all, including any deletion informations.
+ */
+ public boolean isEmpty();
+
+ /**
+ * Returns the row corresponding to the provided clustering, or null if there is not such row.
+ */
+ public Row getRow(Clustering clustering);
+
+ /**
+ * Returns an iterator that allows to search specific rows efficiently.
+ */
+ public SearchIterator<Clustering, Row> searchIterator(ColumnFilter columns, boolean reversed);
+
+ /**
+ * Returns an UnfilteredRowIterator over all the rows/RT contained by this partition.
+ */
+ public UnfilteredRowIterator unfilteredIterator();
+
+ /**
+ * Returns an UnfilteredRowIterator over the rows/RT contained by this partition
+ * selected by the provided slices.
+ */
+ public UnfilteredRowIterator unfilteredIterator(ColumnFilter columns, Slices slices, boolean reversed);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/PartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/PartitionIterator.java
new file mode 100644
index 0000000..36358fc
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.util.Iterator;
+
+import org.apache.cassandra.db.rows.*;
+
+/**
+ * An iterator over a number of (filtered) partition.
+ *
+ * PartitionIterator is to RowIterator what UnfilteredPartitionIterator is to UnfilteredRowIterator
+ * though unlike UnfilteredPartitionIterator, it is not guaranteed that the RowIterator
+ * returned are in partitioner order.
+ *
+ * The object returned by a call to next() is only guaranteed to be
+ * valid until the next call to hasNext() or next(). If a consumer wants to keep a
+ * reference on the returned objects for longer than the iteration, it must
+ * make a copy of it explicitely.
+ */
+public interface PartitionIterator extends Iterator<RowIterator>, AutoCloseable
+{
+ public void close();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
new file mode 100644
index 0000000..219aa5a
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.util.*;
+import java.security.MessageDigest;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.util.FileUtils;
+
+public abstract class PartitionIterators
+{
+ private PartitionIterators() {}
+
+ public static final PartitionIterator EMPTY = new PartitionIterator()
+ {
+ public boolean hasNext()
+ {
+ return false;
+ }
+
+ public RowIterator next()
+ {
+ throw new NoSuchElementException();
+ }
+
+ public void remove()
+ {
+ }
+
+ public void close()
+ {
+ }
+ };
+
+ @SuppressWarnings("resource") // The created resources are returned right away
+ public static RowIterator getOnlyElement(final PartitionIterator iter, SinglePartitionReadCommand<?> command)
+ {
+ // If the query has no results, we'll get an empty iterator, but we still
+ // want a RowIterator out of this method, so we return an empty one.
+ RowIterator toReturn = iter.hasNext()
+ ? iter.next()
+ : RowIterators.emptyIterator(command.metadata(),
+ command.partitionKey(),
+ command.clusteringIndexFilter().isReversed());
+
+ // Note that in general, we should wrap the result so that it's close method actually
+ // close the whole PartitionIterator.
+ return new WrappingRowIterator(toReturn)
+ {
+ public void close()
+ {
+ try
+ {
+ super.close();
+ }
+ finally
+ {
+ // asserting this only now because it bothers UnfilteredPartitionIterators.Serializer (which might be used
+ // under the provided DataIter) if hasNext() is called before the previously returned iterator hasn't been fully consumed.
+ assert !iter.hasNext();
+
+ iter.close();
+ }
+ }
+ };
+ }
+
+ @SuppressWarnings("resource") // The created resources are returned right away
+ public static PartitionIterator concat(final List<PartitionIterator> iterators)
+ {
+ if (iterators.size() == 1)
+ return iterators.get(0);
+
+ return new PartitionIterator()
+ {
+ private int idx = 0;
+
+ public boolean hasNext()
+ {
+ while (idx < iterators.size())
+ {
+ if (iterators.get(idx).hasNext())
+ return true;
+
+ ++idx;
+ }
+ return false;
+ }
+
+ public RowIterator next()
+ {
+ if (!hasNext())
+ throw new NoSuchElementException();
+ return iterators.get(idx).next();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close()
+ {
+ FileUtils.closeQuietly(iterators);
+ }
+ };
+ }
+
+ public static void digest(PartitionIterator iterator, MessageDigest digest)
+ {
+ while (iterator.hasNext())
+ {
+ try (RowIterator partition = iterator.next())
+ {
+ RowIterators.digest(partition, digest);
+ }
+ }
+ }
+
+ public static PartitionIterator singletonIterator(RowIterator iterator)
+ {
+ return new SingletonPartitionIterator(iterator);
+ }
+
+ public static void consume(PartitionIterator iterator)
+ {
+ while (iterator.hasNext())
+ {
+ try (RowIterator partition = iterator.next())
+ {
+ while (partition.hasNext())
+ partition.next();
+ }
+ }
+ }
+
+ /**
+ * Wraps the provided iterator so it logs the returned rows for debugging purposes.
+ * <p>
+ * Note that this is only meant for debugging as this can log a very large amount of
+ * logging at INFO.
+ */
+ @SuppressWarnings("resource") // The created resources are returned right away
+ public static PartitionIterator loggingIterator(PartitionIterator iterator, final String id)
+ {
+ return new WrappingPartitionIterator(iterator)
+ {
+ public RowIterator next()
+ {
+ return RowIterators.loggingIterator(super.next(), id);
+ }
+ };
+ }
+
+ private static class SingletonPartitionIterator extends AbstractIterator<RowIterator> implements PartitionIterator
+ {
+ private final RowIterator iterator;
+ private boolean returned;
+
+ private SingletonPartitionIterator(RowIterator iterator)
+ {
+ this.iterator = iterator;
+ }
+
+ protected RowIterator computeNext()
+ {
+ if (returned)
+ return endOfData();
+
+ returned = true;
+ return iterator;
+ }
+
+ public void close()
+ {
+ iterator.close();
+ }
+ }
+}