You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/07/22 18:05:39 UTC
[10/15] cassandra git commit: Simplify some 8099's implementations
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
new file mode 100644
index 0000000..d79ab06
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.util.*;
+
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.utils.SearchIterator;
+
+/**
+ * Abstract common class for all non-thread safe Partition implementations.
+ */
+public abstract class AbstractThreadUnsafePartition implements Partition, Iterable<Row>
+{
+ protected final CFMetaData metadata;
+ protected final DecoratedKey key;
+
+ protected final PartitionColumns columns;
+
+ protected final List<Row> rows;
+
+ protected AbstractThreadUnsafePartition(CFMetaData metadata,
+ DecoratedKey key,
+ PartitionColumns columns,
+ List<Row> rows)
+ {
+ this.metadata = metadata;
+ this.key = key;
+ this.columns = columns;
+ this.rows = rows;
+ }
+
+ public CFMetaData metadata()
+ {
+ return metadata;
+ }
+
+ public DecoratedKey partitionKey()
+ {
+ return key;
+ }
+
+ public DeletionTime partitionLevelDeletion()
+ {
+ return deletionInfo().getPartitionDeletion();
+ }
+
+ public PartitionColumns columns()
+ {
+ return columns;
+ }
+
+ public abstract Row staticRow();
+
+ protected abstract boolean canHaveShadowedData();
+
+ /**
+ * The deletion info for the partition update.
+ *
+ * Note: do not cast the result to a {@code MutableDeletionInfo} to modify it!
+ *
+ * @return the deletion info for the partition update for use as read-only.
+ */
+ public abstract DeletionInfo deletionInfo();
+
+ public int rowCount()
+ {
+ return rows.size();
+ }
+
+ public boolean isEmpty()
+ {
+ return deletionInfo().isLive() && rows.isEmpty() && staticRow().isEmpty();
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ CFMetaData metadata = metadata();
+ sb.append(String.format("Partition[%s.%s] key=%s columns=%s%s",
+ metadata().ksName,
+ metadata().cfName,
+ metadata().getKeyValidator().getString(partitionKey().getKey()),
+ columns(),
+ deletionInfo().isLive() ? "" : " " + deletionInfo()));
+
+ if (staticRow() != Rows.EMPTY_STATIC_ROW)
+ sb.append("\n ").append(staticRow().toString(metadata, true));
+
+ // We use createRowIterator() directly instead of iterator() because that avoids
+ // sorting for PartitionUpdate (which inherit this method) and that is useful because
+ // 1) it can help with debugging and 2) we can't write after sorting but we want to
+ // be able to print an update while we build it (again for debugging)
+ for (Row row : this)
+ sb.append("\n ").append(row.toString(metadata, true));
+
+ return sb.toString();
+ }
+
+ public Row getRow(Clustering clustering)
+ {
+ Row row = searchIterator(ColumnFilter.selection(columns()), false).next(clustering);
+ // Note that for statics, this will never return null, this will return an empty row. However,
+ // it's more consistent for this method to return null if we don't really have a static row.
+ return row == null || (clustering == Clustering.STATIC_CLUSTERING && row.isEmpty()) ? null : row;
+ }
+
+ /**
+ * Returns an iterator that iterators over the rows of this update in clustering order.
+ *
+ * @return an iterator over the rows of this partition.
+ */
+ public Iterator<Row> iterator()
+ {
+ return rows.iterator();
+ }
+
+ public SearchIterator<Clustering, Row> searchIterator(final ColumnFilter columns, boolean reversed)
+ {
+ final RowSearcher searcher = reversed ? new ReverseRowSearcher() : new ForwardRowSearcher();
+ return new SearchIterator<Clustering, Row>()
+ {
+ public boolean hasNext()
+ {
+ return !searcher.isDone();
+ }
+
+ public Row next(Clustering clustering)
+ {
+ if (clustering == Clustering.STATIC_CLUSTERING)
+ {
+ Row staticRow = staticRow();
+ return staticRow.isEmpty() || columns.fetchedColumns().statics.isEmpty()
+ ? Rows.EMPTY_STATIC_ROW
+ : staticRow.filter(columns, partitionLevelDeletion(), true, metadata);
+ }
+
+ Row row = searcher.search(clustering);
+ RangeTombstone rt = deletionInfo().rangeCovering(clustering);
+
+ // A search iterator only return a row, so it doesn't allow to directly account for deletion that should apply to to row
+ // (the partition deletion or the deletion of a range tombstone that covers it). So if needs be, reuse the row deletion
+ // to carry the proper deletion on the row.
+ DeletionTime activeDeletion = partitionLevelDeletion();
+ if (rt != null && rt.deletionTime().supersedes(activeDeletion))
+ activeDeletion = rt.deletionTime();
+
+ if (row == null)
+ return activeDeletion.isLive() ? null : ArrayBackedRow.emptyDeletedRow(clustering, activeDeletion);
+
+ return row.filter(columns, activeDeletion, true, metadata);
+ }
+ };
+ }
+
+ public UnfilteredRowIterator unfilteredIterator()
+ {
+ return unfilteredIterator(ColumnFilter.all(metadata()), Slices.ALL, false);
+ }
+
+ public UnfilteredRowIterator unfilteredIterator(ColumnFilter columns, Slices slices, boolean reversed)
+ {
+ return slices.makeSliceIterator(sliceableUnfilteredIterator(columns, reversed));
+ }
+
+ protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator()
+ {
+ return sliceableUnfilteredIterator(ColumnFilter.all(metadata()), false);
+ }
+
+ protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter selection, boolean reversed)
+ {
+ return new SliceableIterator(this, selection, reversed);
+ }
+
+ /**
+ * Simple binary search for a given row (in the rows list).
+ *
+ * The return value has the exact same meaning that the one of Collections.binarySearch() but
+ * we don't use the later because we're searching for a 'Clustering' in an array of 'Row' (and while
+ * both are Clusterable, it's slightly faster to use the 'Clustering' comparison (see comment on
+ * ClusteringComparator.rowComparator())).
+ */
+ private int binarySearch(Clustering clustering, int fromIndex, int toIndex)
+ {
+ ClusteringComparator comparator = metadata().comparator;
+ int low = fromIndex;
+ int mid = toIndex;
+ int high = mid - 1;
+ int result = -1;
+ while (low <= high)
+ {
+ mid = (low + high) >> 1;
+ if ((result = comparator.compare(clustering, rows.get(mid).clustering())) > 0)
+ low = mid + 1;
+ else if (result == 0)
+ return mid;
+ else
+ high = mid - 1;
+ }
+ return -mid - (result < 0 ? 1 : 2);
+ }
+
+ private class SliceableIterator extends AbstractUnfilteredRowIterator implements SliceableUnfilteredRowIterator
+ {
+ private final ColumnFilter columns;
+ private RowSearcher searcher;
+
+ private Iterator<Unfiltered> iterator;
+
+ private SliceableIterator(AbstractThreadUnsafePartition partition, ColumnFilter columns, boolean isReverseOrder)
+ {
+ super(partition.metadata(),
+ partition.partitionKey(),
+ partition.partitionLevelDeletion(),
+ columns.fetchedColumns(),
+ partition.staticRow().isEmpty() ? Rows.EMPTY_STATIC_ROW : partition.staticRow().filter(columns, partition.partitionLevelDeletion(), false, partition.metadata()),
+ isReverseOrder,
+ partition.stats());
+ this.columns = columns;
+ }
+
+ protected Unfiltered computeNext()
+ {
+ if (iterator == null)
+ iterator = merge(isReverseOrder ? Lists.reverse(rows).iterator(): iterator(), deletionInfo().rangeIterator(isReverseOrder()));
+
+ return iterator.hasNext() ? iterator.next() : endOfData();
+ }
+
+ public Iterator<Unfiltered> slice(Slice slice)
+ {
+ if (searcher == null)
+ searcher = isReverseOrder() ? new ReverseRowSearcher() : new ForwardRowSearcher();
+ return merge(searcher.slice(slice), deletionInfo().rangeIterator(slice, isReverseOrder()));
+ }
+
+ private Iterator<Unfiltered> merge(Iterator<Row> rows, Iterator<RangeTombstone> ranges)
+ {
+ return new RowAndDeletionMergeIterator(metadata,
+ partitionKey,
+ partitionLevelDeletion,
+ columns,
+ staticRow(),
+ isReverseOrder(),
+ stats(),
+ rows,
+ ranges,
+ canHaveShadowedData());
+ }
+ }
+
+ /**
+ * Utility class to search for rows or slice of rows in order.
+ */
+ private abstract class RowSearcher
+ {
+ public abstract boolean isDone();
+
+ public abstract Row search(Clustering name);
+
+ public abstract Iterator<Row> slice(Slice slice);
+
+ protected int search(Clustering clustering, int from, int to)
+ {
+ return binarySearch(clustering, from, to);
+ }
+
+ protected int search(Slice.Bound bound, int from, int to)
+ {
+ return Collections.binarySearch(rows.subList(from, to), bound, metadata.comparator);
+ }
+ }
+
+ private class ForwardRowSearcher extends RowSearcher
+ {
+ private int nextIdx = 0;
+
+ public boolean isDone()
+ {
+ return nextIdx >= rows.size();
+ }
+
+ public Row search(Clustering name)
+ {
+ if (isDone())
+ return null;
+
+ int idx = search(name, nextIdx, rows.size());
+ if (idx < 0)
+ {
+ nextIdx = -idx - 1;
+ return null;
+ }
+ else
+ {
+ nextIdx = idx + 1;
+ return rows.get(idx);
+ }
+ }
+
+ public Iterator<Row> slice(Slice slice)
+ {
+ // Note that because a Slice.Bound can never sort equally to a Clustering, we know none of the search will
+ // be a match, so we save from testing for it.
+
+ final int start = -search(slice.start(), nextIdx, rows.size()) - 1; // First index to include
+ if (start >= rows.size())
+ return Collections.emptyIterator();
+
+ final int end = -search(slice.end(), start, rows.size()) - 1; // First index to exclude
+
+ // Remember the end to speed up potential further slice search
+ nextIdx = end;
+
+ if (start >= end)
+ return Collections.emptyIterator();
+
+ return rows.subList(start, end).iterator();
+ }
+ }
+
+ private class ReverseRowSearcher extends RowSearcher
+ {
+ private int nextIdx = rows.size() - 1;
+
+ public boolean isDone()
+ {
+ return nextIdx < 0;
+ }
+
+ public Row search(Clustering name)
+ {
+ if (isDone())
+ return null;
+
+ int idx = search(name, 0, nextIdx);
+ if (idx < 0)
+ {
+ // The insertion point is the first element greater than name, so we want start from the previous one next time
+ nextIdx = -idx - 2;
+ return null;
+ }
+ else
+ {
+ nextIdx = idx - 1;
+ return rows.get(idx);
+ }
+ }
+
+ public Iterator<Row> slice(Slice slice)
+ {
+ // Note that because a Slice.Bound can never sort equally to a Clustering, we know none of the search will
+ // be a match, so we save from testing for it.
+
+ // The insertion point is the first element greater than slice.end(), so we want the previous index
+ final int start = -search(slice.end(), 0, nextIdx + 1) - 2; // First index to include
+ if (start < 0)
+ return Collections.emptyIterator();
+
+ final int end = -search(slice.start(), 0, start + 1) - 2; // First index to exclude
+
+ // Remember the end to speed up potential further slice search
+ nextIdx = end;
+
+ if (start < end)
+ return Collections.emptyIterator();
+
+ return Lists.reverse(rows.subList(end+1, start+1)).iterator();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/AlteringUnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AlteringUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/AlteringUnfilteredPartitionIterator.java
new file mode 100644
index 0000000..f7d7222
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/AlteringUnfilteredPartitionIterator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.rows.*;
+
+/**
+ * A partition iterator that allows to filter/modify the unfiltered from the
+ * underlying iterators.
+ */
+public abstract class AlteringUnfilteredPartitionIterator extends WrappingUnfilteredPartitionIterator
+{
+ protected AlteringUnfilteredPartitionIterator(UnfilteredPartitionIterator wrapped)
+ {
+ super(wrapped);
+ }
+
+ protected Row computeNextStatic(DecoratedKey partitionKey, Row row)
+ {
+ return row;
+ }
+
+ protected Row computeNext(DecoratedKey partitionKey, Row row)
+ {
+ return row;
+ }
+
+ protected RangeTombstoneMarker computeNext(DecoratedKey partitionKey, RangeTombstoneMarker marker)
+ {
+ return marker;
+ }
+
+ @Override
+ protected UnfilteredRowIterator computeNext(UnfilteredRowIterator iter)
+ {
+ final DecoratedKey partitionKey = iter.partitionKey();
+ return new AlteringUnfilteredRowIterator(iter)
+ {
+ protected Row computeNextStatic(Row row)
+ {
+ return AlteringUnfilteredPartitionIterator.this.computeNextStatic(partitionKey, row);
+ }
+
+ protected Row computeNext(Row row)
+ {
+ return AlteringUnfilteredPartitionIterator.this.computeNext(partitionKey, row);
+ }
+
+ protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker)
+ {
+ return AlteringUnfilteredPartitionIterator.this.computeNext(partitionKey, marker);
+ }
+ };
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
index bec8056..f39245b 100644
--- a/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
@@ -18,11 +18,11 @@
package org.apache.cassandra.db.partitions;
import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.util.*;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -33,24 +33,31 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
{
private final int createdAtInSec;
- // Note that those fields are really immutable, but we can't easily pass their values to
- // the ctor so they are not final.
- private int cachedLiveRows;
- private int rowsWithNonExpiringCells;
+ private final int cachedLiveRows;
+ private final int rowsWithNonExpiringCells;
- private int nonTombstoneCellCount;
- private int nonExpiringLiveCells;
+ private final int nonTombstoneCellCount;
+ private final int nonExpiringLiveCells;
private ArrayBackedCachedPartition(CFMetaData metadata,
DecoratedKey partitionKey,
- DeletionTime deletionTime,
PartitionColumns columns,
- int initialRowCapacity,
- boolean sortable,
- int createdAtInSec)
+ Row staticRow,
+ List<Row> rows,
+ DeletionInfo deletionInfo,
+ RowStats stats,
+ int createdAtInSec,
+ int cachedLiveRows,
+ int rowsWithNonExpiringCells,
+ int nonTombstoneCellCount,
+ int nonExpiringLiveCells)
{
- super(metadata, partitionKey, deletionTime, columns, initialRowCapacity, sortable);
+ super(metadata, partitionKey, columns, staticRow, rows, deletionInfo, stats);
this.createdAtInSec = createdAtInSec;
+ this.cachedLiveRows = cachedLiveRows;
+ this.rowsWithNonExpiringCells = rowsWithNonExpiringCells;
+ this.nonTombstoneCellCount = nonTombstoneCellCount;
+ this.nonExpiringLiveCells = nonExpiringLiveCells;
}
/**
@@ -65,7 +72,7 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
*/
public static ArrayBackedCachedPartition create(UnfilteredRowIterator iterator, int nowInSec)
{
- return create(iterator, 4, nowInSec);
+ return create(iterator, 16, nowInSec);
}
/**
@@ -82,30 +89,76 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
*/
public static ArrayBackedCachedPartition create(UnfilteredRowIterator iterator, int initialRowCapacity, int nowInSec)
{
- ArrayBackedCachedPartition partition = new ArrayBackedCachedPartition(iterator.metadata(),
- iterator.partitionKey(),
- iterator.partitionLevelDeletion(),
- iterator.columns(),
- initialRowCapacity,
- iterator.isReverseOrder(),
- nowInSec);
+ CFMetaData metadata = iterator.metadata();
+ boolean reversed = iterator.isReverseOrder();
- partition.staticRow = iterator.staticRow().takeAlias();
+ List<Row> rows = new ArrayList<>(initialRowCapacity);
+ MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(iterator.partitionLevelDeletion(), metadata.comparator, reversed);
- Writer writer = partition.new Writer(nowInSec);
- RangeTombstoneCollector markerCollector = partition.new RangeTombstoneCollector(iterator.isReverseOrder());
+ int cachedLiveRows = 0;
+ int rowsWithNonExpiringCells = 0;
- copyAll(iterator, writer, markerCollector, partition);
+ int nonTombstoneCellCount = 0;
+ int nonExpiringLiveCells = 0;
- return partition;
+ while (iterator.hasNext())
+ {
+ Unfiltered unfiltered = iterator.next();
+ if (unfiltered.kind() == Unfiltered.Kind.ROW)
+ {
+ Row row = (Row)unfiltered;
+ rows.add(row);
+
+ // Collect stats
+ if (row.hasLiveData(nowInSec))
+ ++cachedLiveRows;
+
+ boolean hasNonExpiringCell = false;
+ for (Cell cell : row.cells())
+ {
+ if (!cell.isTombstone())
+ {
+ ++nonTombstoneCellCount;
+ if (!cell.isExpiring())
+ {
+ hasNonExpiringCell = true;
+ ++nonExpiringLiveCells;
+ }
+ }
+ }
+
+ if (hasNonExpiringCell)
+ ++rowsWithNonExpiringCells;
+ }
+ else
+ {
+ deletionBuilder.add((RangeTombstoneMarker)unfiltered);
+ }
+ }
+
+ if (reversed)
+ Collections.reverse(rows);
+
+ return new ArrayBackedCachedPartition(metadata,
+ iterator.partitionKey(),
+ iterator.columns(),
+ iterator.staticRow(),
+ rows,
+ deletionBuilder.build(),
+ iterator.stats(),
+ nowInSec,
+ cachedLiveRows,
+ rowsWithNonExpiringCells,
+ nonTombstoneCellCount,
+ nonExpiringLiveCells);
}
public Row lastRow()
{
- if (rows == 0)
+ if (rows.isEmpty())
return null;
- return new InternalReusableRow().setTo(rows - 1);
+ return rows.get(rows.size() - 1);
}
/**
@@ -146,62 +199,6 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
return nonExpiringLiveCells;
}
- // Writers that collect the values for 'cachedLiveRows', 'rowsWithNonExpiringCells', 'nonTombstoneCellCount'
- // and 'nonExpiringLiveCells'.
- protected class Writer extends AbstractPartitionData.Writer
- {
- private final int nowInSec;
-
- private boolean hasLiveData;
- private boolean hasNonExpiringCell;
-
- protected Writer(int nowInSec)
- {
- super(true);
- this.nowInSec = nowInSec;
- }
-
- @Override
- public void writePartitionKeyLivenessInfo(LivenessInfo info)
- {
- super.writePartitionKeyLivenessInfo(info);
- if (info.isLive(nowInSec))
- hasLiveData = true;
- }
-
- @Override
- public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path)
- {
- super.writeCell(column, isCounter, value, info, path);
-
- if (info.isLive(nowInSec))
- {
- hasLiveData = true;
- if (!info.hasTTL())
- {
- hasNonExpiringCell = true;
- ++ArrayBackedCachedPartition.this.nonExpiringLiveCells;
- }
- }
-
- if (!info.hasLocalDeletionTime() || info.hasTTL())
- ++ArrayBackedCachedPartition.this.nonTombstoneCellCount;
- }
-
- @Override
- public void endOfRow()
- {
- super.endOfRow();
- if (hasLiveData)
- ++ArrayBackedCachedPartition.this.cachedLiveRows;
- if (hasNonExpiringCell)
- ++ArrayBackedCachedPartition.this.rowsWithNonExpiringCells;
-
- hasLiveData = false;
- hasNonExpiringCell = false;
- }
- }
-
static class Serializer implements ISerializer<CachedPartition>
{
public void serialize(CachedPartition partition, DataOutputPlus out) throws IOException
@@ -210,9 +207,13 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
ArrayBackedCachedPartition p = (ArrayBackedCachedPartition)partition;
out.writeInt(p.createdAtInSec);
+ out.writeInt(p.cachedLiveRows);
+ out.writeInt(p.rowsWithNonExpiringCells);
+ out.writeInt(p.nonTombstoneCellCount);
+ out.writeInt(p.nonExpiringLiveCells);
try (UnfilteredRowIterator iter = p.sliceableUnfilteredIterator())
{
- UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, MessagingService.current_version, p.rows);
+ UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, MessagingService.current_version, p.rowCount());
}
}
@@ -226,18 +227,42 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
// is slightly faster.
int createdAtInSec = in.readInt();
+ int cachedLiveRows = in.readInt();
+ int rowsWithNonExpiringCells = in.readInt();
+ int nonTombstoneCellCount = in.readInt();
+ int nonExpiringLiveCells = in.readInt();
+
+ UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, MessagingService.current_version, SerializationHelper.Flag.LOCAL);
+ assert !header.isReversed && header.rowEstimate >= 0;
- UnfilteredRowIteratorSerializer.Header h = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, MessagingService.current_version, SerializationHelper.Flag.LOCAL);
- assert !h.isReversed && h.rowEstimate >= 0;
+ MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(header.partitionDeletion, header.metadata.comparator, false);
+ List<Row> rows = new ArrayList<>(header.rowEstimate);
- ArrayBackedCachedPartition partition = new ArrayBackedCachedPartition(h.metadata, h.key, h.partitionDeletion, h.sHeader.columns(), h.rowEstimate, false, createdAtInSec);
- partition.staticRow = h.staticRow;
+ try (UnfilteredRowIterator partition = UnfilteredRowIteratorSerializer.serializer.deserialize(in, MessagingService.current_version, SerializationHelper.Flag.LOCAL, header))
+ {
+ while (partition.hasNext())
+ {
+ Unfiltered unfiltered = partition.next();
+ if (unfiltered.kind() == Unfiltered.Kind.ROW)
+ rows.add((Row)unfiltered);
+ else
+ deletionBuilder.add((RangeTombstoneMarker)unfiltered);
+ }
+ }
- Writer writer = partition.new Writer(createdAtInSec);
- RangeTombstoneMarker.Writer markerWriter = partition.new RangeTombstoneCollector(false);
+ return new ArrayBackedCachedPartition(header.metadata,
+ header.key,
+ header.sHeader.columns(),
+ header.staticRow,
+ rows,
+ deletionBuilder.build(),
+ header.sHeader.stats(),
+ createdAtInSec,
+ cachedLiveRows,
+ rowsWithNonExpiringCells,
+ nonTombstoneCellCount,
+ nonExpiringLiveCells);
- UnfilteredRowIteratorSerializer.serializer.deserialize(in, new SerializationHelper(MessagingService.current_version, SerializationHelper.Flag.LOCAL), h.sHeader, writer, markerWriter);
- return partition;
}
public long serializedSize(CachedPartition partition)
@@ -248,7 +273,11 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
try (UnfilteredRowIterator iter = p.sliceableUnfilteredIterator())
{
return TypeSizes.sizeof(p.createdAtInSec)
- + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, MessagingService.current_version, p.rows);
+ + TypeSizes.sizeof(p.cachedLiveRows)
+ + TypeSizes.sizeof(p.rowsWithNonExpiringCells)
+ + TypeSizes.sizeof(p.nonTombstoneCellCount)
+ + TypeSizes.sizeof(p.nonExpiringLiveCells)
+ + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, MessagingService.current_version, p.rowCount());
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java b/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java
index d7f3a88..4485117 100644
--- a/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java
@@ -17,28 +17,30 @@
*/
package org.apache.cassandra.db.partitions;
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.util.*;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.net.MessagingService;
-public class ArrayBackedPartition extends AbstractPartitionData
+public class ArrayBackedPartition extends AbstractThreadUnsafePartition
{
+ private final Row staticRow;
+ private final DeletionInfo deletionInfo;
+ private final RowStats stats;
+
protected ArrayBackedPartition(CFMetaData metadata,
DecoratedKey partitionKey,
- DeletionTime deletionTime,
PartitionColumns columns,
- int initialRowCapacity,
- boolean sortable)
+ Row staticRow,
+ List<Row> rows,
+ DeletionInfo deletionInfo,
+ RowStats stats)
{
- super(metadata, partitionKey, deletionTime, columns, initialRowCapacity, sortable);
+ super(metadata, partitionKey, columns, rows);
+ this.staticRow = staticRow;
+ this.deletionInfo = deletionInfo;
+ this.stats = stats;
}
/**
@@ -52,7 +54,7 @@ public class ArrayBackedPartition extends AbstractPartitionData
*/
public static ArrayBackedPartition create(UnfilteredRowIterator iterator)
{
- return create(iterator, 4);
+ return create(iterator, 16);
}
/**
@@ -68,37 +70,45 @@ public class ArrayBackedPartition extends AbstractPartitionData
*/
public static ArrayBackedPartition create(UnfilteredRowIterator iterator, int initialRowCapacity)
{
- ArrayBackedPartition partition = new ArrayBackedPartition(iterator.metadata(),
- iterator.partitionKey(),
- iterator.partitionLevelDeletion(),
- iterator.columns(),
- initialRowCapacity,
- iterator.isReverseOrder());
-
- partition.staticRow = iterator.staticRow().takeAlias();
-
- Writer writer = partition.new Writer(true);
- RangeTombstoneCollector markerCollector = partition.new RangeTombstoneCollector(iterator.isReverseOrder());
+ CFMetaData metadata = iterator.metadata();
+ boolean reversed = iterator.isReverseOrder();
- copyAll(iterator, writer, markerCollector, partition);
-
- return partition;
- }
+ List<Row> rows = new ArrayList<>(initialRowCapacity);
+ MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(iterator.partitionLevelDeletion(), metadata.comparator, reversed);
- protected static void copyAll(UnfilteredRowIterator iterator, Writer writer, RangeTombstoneCollector markerCollector, ArrayBackedPartition partition)
- {
while (iterator.hasNext())
{
Unfiltered unfiltered = iterator.next();
if (unfiltered.kind() == Unfiltered.Kind.ROW)
- ((Row) unfiltered).copyTo(writer);
+ rows.add((Row)unfiltered);
else
- ((RangeTombstoneMarker) unfiltered).copyTo(markerCollector);
+ deletionBuilder.add((RangeTombstoneMarker)unfiltered);
}
- // A Partition (or more precisely AbstractPartitionData) always assumes that its data is in clustering
- // order. So if we've just added them in reverse clustering order, reverse them.
- if (iterator.isReverseOrder())
- partition.reverse();
+ if (reversed)
+ Collections.reverse(rows);
+
+ return new ArrayBackedPartition(metadata, iterator.partitionKey(), iterator.columns(), iterator.staticRow(), rows, deletionBuilder.build(), iterator.stats());
+ }
+
+ protected boolean canHaveShadowedData()
+ {
+ // We only create instances from UnfilteredRowIterator that don't have shadowed data
+ return false;
+ }
+
+ public Row staticRow()
+ {
+ return staticRow;
+ }
+
+ public DeletionInfo deletionInfo()
+ {
+ return deletionInfo;
+ }
+
+ public RowStats stats()
+ {
+ return stats;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index 6a888a6..1361422 100644
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -26,11 +26,8 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Iterators;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.filter.ColumnFilter;
@@ -89,10 +86,7 @@ public class AtomicBTreePartition implements Partition
private static final AtomicIntegerFieldUpdater<AtomicBTreePartition> wasteTrackerUpdater = AtomicIntegerFieldUpdater.newUpdater(AtomicBTreePartition.class, "wasteTracker");
- private static final DeletionInfo LIVE = DeletionInfo.live();
- // This is a small optimization: DeletionInfo is mutable, but we know that we will always copy it in that class,
- // so we can safely alias one DeletionInfo.live() reference and avoid some allocations.
- private static final Holder EMPTY = new Holder(BTree.empty(), LIVE, null, RowStats.NO_STATS);
+ private static final Holder EMPTY = new Holder(BTree.empty(), DeletionInfo.LIVE, Rows.EMPTY_STATIC_ROW, RowStats.NO_STATS);
private final CFMetaData metadata;
private final DecoratedKey partitionKey;
@@ -154,146 +148,56 @@ public class AtomicBTreePartition implements Partition
return row == null || (clustering == Clustering.STATIC_CLUSTERING && row.isEmpty()) ? null : row;
}
+ private Row staticRow(Holder current, ColumnFilter columns, boolean setActiveDeletionToRow)
+ {
+ DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion();
+ if (columns.fetchedColumns().statics.isEmpty() || (current.staticRow.isEmpty() && partitionDeletion.isLive()))
+ return Rows.EMPTY_STATIC_ROW;
+
+ Row row = current.staticRow.filter(columns, partitionDeletion, setActiveDeletionToRow, metadata);
+ return row == null ? Rows.EMPTY_STATIC_ROW : row;
+ }
+
public SearchIterator<Clustering, Row> searchIterator(final ColumnFilter columns, final boolean reversed)
{
// TODO: we could optimize comparison for "NativeRow" à la #6755
final Holder current = ref;
return new SearchIterator<Clustering, Row>()
{
- private final SearchIterator<Clustering, MemtableRowData> rawIter = new BTreeSearchIterator<>(current.tree, metadata.comparator, !reversed);
- private final MemtableRowData.ReusableRow row = allocator.newReusableRow();
- private final ReusableFilteringRow filter = new ReusableFilteringRow(columns.fetchedColumns().regulars, columns);
- private final long partitionDeletion = current.deletionInfo.getPartitionDeletion().markedForDeleteAt();
+ private final SearchIterator<Clustering, Row> rawIter = new BTreeSearchIterator<>(current.tree, metadata.comparator, !reversed);
+ private final DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion();
public boolean hasNext()
{
return rawIter.hasNext();
}
- public Row next(Clustering key)
+ public Row next(Clustering clustering)
{
- if (key == Clustering.STATIC_CLUSTERING)
- return makeStatic(columns, current, allocator);
+ if (clustering == Clustering.STATIC_CLUSTERING)
+ return staticRow(current, columns, true);
- MemtableRowData data = rawIter.next(key);
- // We also need to find if there is a range tombstone covering this key
- RangeTombstone rt = current.deletionInfo.rangeCovering(key);
+ Row row = rawIter.next(clustering);
+ RangeTombstone rt = current.deletionInfo.rangeCovering(clustering);
- if (data == null)
- {
- // If we have a range tombstone but not data, "fake" the RT by return a row deletion
- // corresponding to the tombstone.
- if (rt != null && rt.deletionTime().markedForDeleteAt() > partitionDeletion)
- return filter.setRowDeletion(rt.deletionTime()).setTo(emptyDeletedRow(key, rt.deletionTime()));
- return null;
- }
+ // A search iterator only return a row, so it doesn't allow to directly account for deletion that should apply to to row
+ // (the partition deletion or the deletion of a range tombstone that covers it). So if needs be, reuse the row deletion
+ // to carry the proper deletion on the row.
+ DeletionTime activeDeletion = partitionDeletion;
+ if (rt != null && rt.deletionTime().supersedes(activeDeletion))
+ activeDeletion = rt.deletionTime();
- row.setTo(data);
+ if (row == null)
+ return activeDeletion.isLive() ? null : ArrayBackedRow.emptyDeletedRow(clustering, activeDeletion);
- filter.setRowDeletion(null);
- if (rt == null || rt.deletionTime().markedForDeleteAt() < partitionDeletion)
- {
- filter.setDeletionTimestamp(partitionDeletion);
- }
- else
- {
- filter.setDeletionTimestamp(rt.deletionTime().markedForDeleteAt());
- // If we have a range tombstone covering that row and it's bigger than the row deletion itself, then
- // we replace the row deletion by the tombstone deletion as a way to return the tombstone.
- if (rt.deletionTime().supersedes(row.deletion()))
- filter.setRowDeletion(rt.deletionTime());
- }
-
- return filter.setTo(row);
- }
- };
- }
-
- private static Row emptyDeletedRow(Clustering clustering, DeletionTime deletion)
- {
- return new AbstractRow()
- {
- public Columns columns()
- {
- return Columns.NONE;
- }
-
- public LivenessInfo primaryKeyLivenessInfo()
- {
- return LivenessInfo.NONE;
- }
-
- public DeletionTime deletion()
- {
- return deletion;
- }
-
- public boolean isEmpty()
- {
- return true;
- }
-
- public boolean hasComplexDeletion()
- {
- return false;
- }
-
- public Clustering clustering()
- {
- return clustering;
- }
-
- public Cell getCell(ColumnDefinition c)
- {
- return null;
- }
-
- public Cell getCell(ColumnDefinition c, CellPath path)
- {
- return null;
- }
-
- public Iterator<Cell> getCells(ColumnDefinition c)
- {
- return null;
- }
-
- public DeletionTime getDeletion(ColumnDefinition c)
- {
- return DeletionTime.LIVE;
- }
-
- public Iterator<Cell> iterator()
- {
- return Iterators.<Cell>emptyIterator();
- }
-
- public SearchIterator<ColumnDefinition, ColumnData> searchIterator()
- {
- return new SearchIterator<ColumnDefinition, ColumnData>()
- {
- public boolean hasNext()
- {
- return false;
- }
-
- public ColumnData next(ColumnDefinition column)
- {
- return null;
- }
- };
- }
-
- public Row takeAlias()
- {
- return this;
+ return row.filter(columns, activeDeletion, true, metadata);
}
};
}
public UnfilteredRowIterator unfilteredIterator()
{
- return unfilteredIterator(ColumnFilter.selection(columns()), Slices.ALL, false);
+ return unfilteredIterator(ColumnFilter.all(metadata()), Slices.ALL, false);
}
public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed)
@@ -309,7 +213,7 @@ public class AtomicBTreePartition implements Partition
partitionKey,
partitionDeletion,
selection.fetchedColumns(),
- makeStatic(selection, current, allocator),
+ staticRow(current, selection, false),
reversed,
current.stats)
{
@@ -320,189 +224,51 @@ public class AtomicBTreePartition implements Partition
};
}
+ Holder current = ref;
+ Row staticRow = staticRow(current, selection, false);
return slices.size() == 1
- ? new SingleSliceIterator(metadata, partitionKey, ref, selection, slices.get(0), reversed, allocator)
- : new SlicesIterator(metadata, partitionKey, ref, selection, slices, reversed, allocator);
+ ? sliceIterator(selection, slices.get(0), reversed, current, staticRow)
+ : new SlicesIterator(metadata, partitionKey, selection, slices, reversed, current, staticRow);
}
- private static Row makeStatic(ColumnFilter selection, Holder holder, MemtableAllocator allocator)
+ private UnfilteredRowIterator sliceIterator(ColumnFilter selection, Slice slice, boolean reversed, Holder current, Row staticRow)
{
- Columns statics = selection.fetchedColumns().statics;
- if (statics.isEmpty() || holder.staticRow == null)
- return Rows.EMPTY_STATIC_ROW;
-
- return new ReusableFilteringRow(statics, selection)
- .setDeletionTimestamp(holder.deletionInfo.getPartitionDeletion().markedForDeleteAt())
- .setTo(allocator.newReusableRow().setTo(holder.staticRow));
+ Slice.Bound start = slice.start() == Slice.Bound.BOTTOM ? null : slice.start();
+ Slice.Bound end = slice.end() == Slice.Bound.TOP ? null : slice.end();
+ Iterator<Row> rowIter = BTree.slice(current.tree, metadata.comparator, start, true, end, true, !reversed);
+
+ return new RowAndDeletionMergeIterator(metadata,
+ partitionKey,
+ current.deletionInfo.getPartitionDeletion(),
+ selection,
+ staticRow,
+ reversed,
+ current.stats,
+ rowIter,
+ current.deletionInfo.rangeIterator(slice, reversed),
+ true);
}
- private static class ReusableFilteringRow extends FilteringRow
+ public class SlicesIterator extends AbstractUnfilteredRowIterator
{
- private final Columns columns;
- private final ColumnFilter selection;
- private ColumnFilter.Tester tester;
- private long deletionTimestamp;
-
- // Used by searchIterator in case the row is covered by a tombstone.
- private DeletionTime rowDeletion;
-
- public ReusableFilteringRow(Columns columns, ColumnFilter selection)
- {
- this.columns = columns;
- this.selection = selection;
- }
-
- public ReusableFilteringRow setDeletionTimestamp(long timestamp)
- {
- this.deletionTimestamp = timestamp;
- return this;
- }
-
- public ReusableFilteringRow setRowDeletion(DeletionTime rowDeletion)
- {
- this.rowDeletion = rowDeletion;
- return this;
- }
-
- @Override
- public DeletionTime deletion()
- {
- return rowDeletion == null ? super.deletion() : rowDeletion;
- }
-
- @Override
- protected boolean include(LivenessInfo info)
- {
- return info.timestamp() > deletionTimestamp;
- }
-
- @Override
- protected boolean include(ColumnDefinition def)
- {
- return columns.contains(def);
- }
-
- @Override
- protected boolean include(DeletionTime dt)
- {
- return dt.markedForDeleteAt() > deletionTimestamp;
- }
-
- @Override
- protected boolean include(ColumnDefinition c, DeletionTime dt)
- {
- return dt.markedForDeleteAt() > deletionTimestamp;
- }
-
- @Override
- protected boolean include(Cell cell)
- {
- return selection.includes(cell);
- }
- }
-
- private static class SingleSliceIterator extends AbstractUnfilteredRowIterator
- {
- private final Iterator<Unfiltered> iterator;
- private final ReusableFilteringRow row;
-
- private SingleSliceIterator(CFMetaData metadata,
- DecoratedKey key,
- Holder holder,
- ColumnFilter selection,
- Slice slice,
- boolean isReversed,
- MemtableAllocator allocator)
- {
- super(metadata,
- key,
- holder.deletionInfo.getPartitionDeletion(),
- selection.fetchedColumns(),
- makeStatic(selection, holder, allocator),
- isReversed,
- holder.stats);
-
- Iterator<Row> rowIter = rowIter(metadata,
- holder,
- slice,
- !isReversed,
- allocator);
-
- this.iterator = new RowAndTombstoneMergeIterator(metadata.comparator, isReversed)
- .setTo(rowIter, holder.deletionInfo.rangeIterator(slice, isReversed));
-
- this.row = new ReusableFilteringRow(selection.fetchedColumns().regulars, selection)
- .setDeletionTimestamp(partitionLevelDeletion.markedForDeleteAt());
- }
-
- private Iterator<Row> rowIter(CFMetaData metadata,
- Holder holder,
- Slice slice,
- boolean forwards,
- final MemtableAllocator allocator)
- {
- Slice.Bound start = slice.start() == Slice.Bound.BOTTOM ? null : slice.start();
- Slice.Bound end = slice.end() == Slice.Bound.TOP ? null : slice.end();
- final Iterator<MemtableRowData> dataIter = BTree.slice(holder.tree, metadata.comparator, start, true, end, true, forwards);
- return new AbstractIterator<Row>()
- {
- private final MemtableRowData.ReusableRow row = allocator.newReusableRow();
-
- protected Row computeNext()
- {
- return dataIter.hasNext() ? row.setTo(dataIter.next()) : endOfData();
- }
- };
- }
-
- protected Unfiltered computeNext()
- {
- while (iterator.hasNext())
- {
- Unfiltered next = iterator.next();
- if (next.kind() == Unfiltered.Kind.ROW)
- {
- row.setTo((Row)next);
- if (!row.isEmpty())
- return row;
- }
- else
- {
- RangeTombstoneMarker marker = (RangeTombstoneMarker)next;
-
- long deletion = partitionLevelDeletion().markedForDeleteAt();
- if (marker.isOpen(isReverseOrder()))
- deletion = Math.max(deletion, marker.openDeletionTime(isReverseOrder()).markedForDeleteAt());
- row.setDeletionTimestamp(deletion);
- return marker;
- }
- }
- return endOfData();
- }
- }
-
- public static class SlicesIterator extends AbstractUnfilteredRowIterator
- {
- private final Holder holder;
- private final MemtableAllocator allocator;
+ private final Holder current;
private final ColumnFilter selection;
private final Slices slices;
private int idx;
- private UnfilteredRowIterator currentSlice;
+ private Iterator<Unfiltered> currentSlice;
private SlicesIterator(CFMetaData metadata,
DecoratedKey key,
- Holder holder,
ColumnFilter selection,
Slices slices,
boolean isReversed,
- MemtableAllocator allocator)
+ Holder holder,
+ Row staticRow)
{
- super(metadata, key, holder.deletionInfo.getPartitionDeletion(), selection.fetchedColumns(), makeStatic(selection, holder, allocator), isReversed, holder.stats);
- this.holder = holder;
+ super(metadata, key, holder.deletionInfo.getPartitionDeletion(), selection.fetchedColumns(), staticRow, isReversed, holder.stats);
+ this.current = holder;
this.selection = selection;
- this.allocator = allocator;
this.slices = slices;
}
@@ -516,13 +282,7 @@ public class AtomicBTreePartition implements Partition
return endOfData();
int sliceIdx = isReverseOrder ? slices.size() - idx - 1 : idx;
- currentSlice = new SingleSliceIterator(metadata,
- partitionKey,
- holder,
- selection,
- slices.get(sliceIdx),
- isReverseOrder,
- allocator);
+ currentSlice = sliceIterator(selection, slices.get(sliceIdx), isReverseOrder, current, Rows.EMPTY_STATIC_ROW);
idx++;
}
@@ -565,7 +325,7 @@ public class AtomicBTreePartition implements Partition
if (inputDeletionInfoCopy == null)
inputDeletionInfoCopy = update.deletionInfo().copy(HeapAllocator.instance);
- deletionInfo = current.deletionInfo.copy().add(inputDeletionInfoCopy);
+ deletionInfo = current.deletionInfo.mutableCopy().add(inputDeletionInfoCopy);
updater.allocated(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize());
}
else
@@ -574,9 +334,9 @@ public class AtomicBTreePartition implements Partition
}
Row newStatic = update.staticRow();
- MemtableRowData staticRow = newStatic == Rows.EMPTY_STATIC_ROW
- ? current.staticRow
- : (current.staticRow == null ? updater.apply(newStatic) : updater.apply(current.staticRow, newStatic));
+ Row staticRow = newStatic.isEmpty()
+ ? current.staticRow
+ : (current.staticRow.isEmpty() ? updater.apply(newStatic) : updater.apply(current.staticRow, newStatic));
Object[] tree = BTree.update(current.tree, update.metadata().comparator, update, update.rowCount(), updater);
RowStats newStats = current.stats.mergeWith(update.stats());
@@ -661,10 +421,10 @@ public class AtomicBTreePartition implements Partition
final DeletionInfo deletionInfo;
// the btree of rows
final Object[] tree;
- final MemtableRowData staticRow;
+ final Row staticRow;
final RowStats stats;
- Holder(Object[] tree, DeletionInfo deletionInfo, MemtableRowData staticRow, RowStats stats)
+ Holder(Object[] tree, DeletionInfo deletionInfo, Row staticRow, RowStats stats)
{
this.tree = tree;
this.deletionInfo = deletionInfo;
@@ -679,7 +439,7 @@ public class AtomicBTreePartition implements Partition
}
// the function we provide to the btree utilities to perform any column replacements
- private static final class RowUpdater implements UpdateFunction<Row, MemtableRowData>
+ private static final class RowUpdater implements UpdateFunction<Row, Row>
{
final AtomicBTreePartition updating;
final MemtableAllocator allocator;
@@ -687,13 +447,13 @@ public class AtomicBTreePartition implements Partition
final Updater indexer;
final int nowInSec;
Holder ref;
+ Row.Builder regularBuilder;
long dataSize;
long heapSize;
long colUpdateTimeDelta = Long.MAX_VALUE;
- final MemtableRowData.ReusableRow row;
final MemtableAllocator.DataReclaimer reclaimer;
- final MemtableAllocator.RowAllocator rowAllocator;
- List<MemtableRowData> inserted; // TODO: replace with walk of aborted BTree
+ List<Row> inserted; // TODO: replace with walk of aborted BTree
+
private RowUpdater(AtomicBTreePartition updating, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
{
@@ -702,18 +462,25 @@ public class AtomicBTreePartition implements Partition
this.writeOp = writeOp;
this.indexer = indexer;
this.nowInSec = FBUtilities.nowInSeconds();
- this.row = allocator.newReusableRow();
this.reclaimer = allocator.reclaimer();
- this.rowAllocator = allocator.newRowAllocator(updating.metadata(), writeOp);
}
- public MemtableRowData apply(Row insert)
+ private Row.Builder builder(Clustering clustering)
{
- rowAllocator.allocateNewRow(insert.clustering().size(), insert.columns(), insert.isStatic());
- insert.copyTo(rowAllocator);
- MemtableRowData data = rowAllocator.allocatedRowData();
+ boolean isStatic = clustering == Clustering.STATIC_CLUSTERING;
+ // We know we only insert/update one static per PartitionUpdate, so no point in saving the builder
+ if (isStatic)
+ return allocator.rowBuilder(updating.metadata(), writeOp, true);
+
+ if (regularBuilder == null)
+ regularBuilder = allocator.rowBuilder(updating.metadata(), writeOp, false);
+ return regularBuilder;
+ }
- insertIntoIndexes(insert);
+ public Row apply(Row insert)
+ {
+ Row data = Rows.copy(insert, builder(insert.clustering())).build();
+ insertIntoIndexes(data);
this.dataSize += data.dataSize();
this.heapSize += data.unsharedHeapSizeExcludingData();
@@ -723,14 +490,14 @@ public class AtomicBTreePartition implements Partition
return data;
}
- public MemtableRowData apply(MemtableRowData existing, Row update)
+ public Row apply(Row existing, Row update)
{
Columns mergedColumns = existing.columns().mergeTo(update.columns());
- rowAllocator.allocateNewRow(update.clustering().size(), mergedColumns, update.isStatic());
- colUpdateTimeDelta = Math.min(colUpdateTimeDelta, Rows.merge(row.setTo(existing), update, mergedColumns, rowAllocator, nowInSec, indexer));
+ Row.Builder builder = builder(existing.clustering());
+ colUpdateTimeDelta = Math.min(colUpdateTimeDelta, Rows.merge(existing, update, mergedColumns, builder, nowInSec, indexer));
- MemtableRowData reconciled = rowAllocator.allocatedRowData();
+ Row reconciled = builder.build();
dataSize += reconciled.dataSize() - existing.dataSize();
heapSize += reconciled.unsharedHeapSizeExcludingData() - existing.unsharedHeapSizeExcludingData();
@@ -749,7 +516,7 @@ public class AtomicBTreePartition implements Partition
maybeIndexPrimaryKeyColumns(toInsert);
Clustering clustering = toInsert.clustering();
- for (Cell cell : toInsert)
+ for (Cell cell : toInsert.cells())
indexer.insert(clustering, cell);
}
@@ -761,15 +528,15 @@ public class AtomicBTreePartition implements Partition
long timestamp = row.primaryKeyLivenessInfo().timestamp();
int ttl = row.primaryKeyLivenessInfo().ttl();
- for (Cell cell : row)
+ for (Cell cell : row.cells())
{
- long cellTimestamp = cell.livenessInfo().timestamp();
+ long cellTimestamp = cell.timestamp();
if (cell.isLive(nowInSec))
{
if (cellTimestamp > timestamp)
{
timestamp = cellTimestamp;
- ttl = cell.livenessInfo().ttl();
+ ttl = cell.ttl();
}
}
}
@@ -783,19 +550,19 @@ public class AtomicBTreePartition implements Partition
this.heapSize = 0;
if (inserted != null)
{
- for (MemtableRowData row : inserted)
+ for (Row row : inserted)
abort(row);
inserted.clear();
}
reclaimer.cancel();
}
- protected void abort(MemtableRowData abort)
+ protected void abort(Row abort)
{
reclaimer.reclaimImmediately(abort);
}
- protected void discard(MemtableRowData discard)
+ protected void discard(Row discard)
{
reclaimer.reclaim(discard);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java
index acaef5d..e5d1e75 100644
--- a/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java
+++ b/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java
@@ -49,10 +49,10 @@ public class CountingUnfilteredRowIterator extends WrappingUnfilteredRowIterator
@Override
public Unfiltered next()
{
- Unfiltered unfiltered = super.next();
- if (unfiltered.kind() == Unfiltered.Kind.ROW)
- counter.newRow((Row) unfiltered);
- return unfiltered;
+ Unfiltered next = super.next();
+ if (next.isRow())
+ counter.newRow((Row)next);
+ return next;
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
index 813654d..1cac274 100644
--- a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
@@ -17,21 +17,24 @@
*/
package org.apache.cassandra.db.partitions;
-import java.util.Iterator;
+import java.util.*;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
-public class FilteredPartition extends AbstractPartitionData implements Iterable<Row>
+public class FilteredPartition extends AbstractThreadUnsafePartition
{
+ private final Row staticRow;
+
private FilteredPartition(CFMetaData metadata,
DecoratedKey partitionKey,
PartitionColumns columns,
- int initialRowCapacity,
- boolean sortable)
+ Row staticRow,
+ List<Row> rows)
{
- super(metadata, partitionKey, DeletionTime.LIVE, columns, initialRowCapacity, sortable);
+ super(metadata, partitionKey, columns, rows);
+ this.staticRow = staticRow;
}
/**
@@ -42,25 +45,43 @@ public class FilteredPartition extends AbstractPartitionData implements Iterable
*/
public static FilteredPartition create(RowIterator iterator)
{
- FilteredPartition partition = new FilteredPartition(iterator.metadata(),
- iterator.partitionKey(),
- iterator.columns(),
- 4,
- iterator.isReverseOrder());
-
- partition.staticRow = iterator.staticRow().takeAlias();
+ CFMetaData metadata = iterator.metadata();
+ boolean reversed = iterator.isReverseOrder();
- Writer writer = partition.new Writer(true);
+ List<Row> rows = new ArrayList<>();
while (iterator.hasNext())
- iterator.next().copyTo(writer);
+ {
+ Unfiltered unfiltered = iterator.next();
+ if (unfiltered.isRow())
+ rows.add((Row)unfiltered);
+ }
+
+ if (reversed)
+ Collections.reverse(rows);
+
+ return new FilteredPartition(metadata, iterator.partitionKey(), iterator.columns(), iterator.staticRow(), rows);
+ }
+
+ protected boolean canHaveShadowedData()
+ {
+ // We only create instances from RowIterator that don't have shadowed data (nor deletion info really)
+ return false;
+ }
- // A Partition (or more precisely AbstractPartitionData) always assumes that its data is in clustering
- // order. So if we've just added them in reverse clustering order, reverse them.
- if (iterator.isReverseOrder())
- partition.reverse();
+ public Row staticRow()
+ {
+ return staticRow;
+ }
- return partition;
+ public DeletionInfo deletionInfo()
+ {
+ return DeletionInfo.LIVE;
+ }
+
+ public RowStats stats()
+ {
+ return RowStats.NO_STATS;
}
public RowIterator rowIterator()
@@ -90,7 +111,7 @@ public class FilteredPartition extends AbstractPartitionData implements Iterable
public Row staticRow()
{
- return staticRow == null ? Rows.EMPTY_STATIC_ROW : staticRow;
+ return FilteredPartition.this.staticRow();
}
public boolean hasNext()
@@ -117,26 +138,20 @@ public class FilteredPartition extends AbstractPartitionData implements Iterable
@Override
public String toString()
{
- try (RowIterator iterator = rowIterator())
- {
- StringBuilder sb = new StringBuilder();
- CFMetaData metadata = iterator.metadata();
- PartitionColumns columns = iterator.columns();
+ StringBuilder sb = new StringBuilder();
- sb.append(String.format("[%s.%s] key=%s columns=%s reversed=%b",
- metadata.ksName,
- metadata.cfName,
- metadata.getKeyValidator().getString(iterator.partitionKey().getKey()),
- columns,
- iterator.isReverseOrder()));
+ sb.append(String.format("[%s.%s] key=%s columns=%s",
+ metadata.ksName,
+ metadata.cfName,
+ metadata.getKeyValidator().getString(partitionKey().getKey()),
+ columns));
- if (iterator.staticRow() != Rows.EMPTY_STATIC_ROW)
- sb.append("\n ").append(iterator.staticRow().toString(metadata));
+ if (staticRow() != Rows.EMPTY_STATIC_ROW)
+ sb.append("\n ").append(staticRow().toString(metadata));
- while (iterator.hasNext())
- sb.append("\n ").append(iterator.next().toString(metadata));
+ for (Row row : this)
+ sb.append("\n ").append(row.toString(metadata));
- return sb.toString();
- }
+ return sb.toString();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/FilteringPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/FilteringPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/FilteringPartitionIterator.java
deleted file mode 100644
index c40109b..0000000
--- a/src/java/org/apache/cassandra/db/partitions/FilteringPartitionIterator.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.partitions;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-
-/**
- * Abstract class to make it easier to write iterators that filter some
- * parts of another iterator (used for purging tombstones and removing dropped columns).
- */
-public abstract class FilteringPartitionIterator extends WrappingUnfilteredPartitionIterator
-{
- private UnfilteredRowIterator next;
-
- protected FilteringPartitionIterator(UnfilteredPartitionIterator iter)
- {
- super(iter);
- }
-
- // The filter to use for filtering row contents. Is null by default to mean no particular filtering
- // but can be overriden by subclasses. Please see FilteringAtomIterator for details on how this is used.
- protected FilteringRow makeRowFilter()
- {
- return null;
- }
-
- // Whether or not we should bother filtering the provided rows iterator. This
- // exists mainly for preformance
- protected boolean shouldFilter(UnfilteredRowIterator iterator)
- {
- return true;
- }
-
- protected boolean includeRangeTombstoneMarker(RangeTombstoneMarker marker)
- {
- return true;
- }
-
- protected boolean includePartitionDeletion(DeletionTime dt)
- {
- return true;
- }
-
- // Allows to modify the range tombstone returned. This is called *after* includeRangeTombstoneMarker has been called.
- protected RangeTombstoneMarker filterRangeTombstoneMarker(RangeTombstoneMarker marker, boolean reversed)
- {
- return marker;
- }
-
- // Called when a particular partition is skipped due to being empty post filtering
- protected void onEmpty(DecoratedKey key)
- {
- }
-
- public boolean hasNext()
- {
- while (next == null && super.hasNext())
- {
- UnfilteredRowIterator iterator = super.next();
- if (shouldFilter(iterator))
- {
- next = new FilteringIterator(iterator);
- if (!isForThrift() && next.isEmpty())
- {
- onEmpty(iterator.partitionKey());
- iterator.close();
- next = null;
- }
- }
- else
- {
- next = iterator;
- }
- }
- return next != null;
- }
-
- public UnfilteredRowIterator next()
- {
- UnfilteredRowIterator toReturn = next;
- next = null;
- return toReturn;
- }
-
- @Override
- public void close()
- {
- try
- {
- super.close();
- }
- finally
- {
- if (next != null)
- next.close();
- }
- }
-
- private class FilteringIterator extends FilteringRowIterator
- {
- private FilteringIterator(UnfilteredRowIterator iterator)
- {
- super(iterator);
- }
-
- @Override
- protected FilteringRow makeRowFilter()
- {
- return FilteringPartitionIterator.this.makeRowFilter();
- }
-
- @Override
- protected boolean includeRangeTombstoneMarker(RangeTombstoneMarker marker)
- {
- return FilteringPartitionIterator.this.includeRangeTombstoneMarker(marker);
- }
-
- @Override
- protected RangeTombstoneMarker filterRangeTombstoneMarker(RangeTombstoneMarker marker, boolean reversed)
- {
- return FilteringPartitionIterator.this.filterRangeTombstoneMarker(marker, reversed);
- }
-
- @Override
- protected boolean includePartitionDeletion(DeletionTime dt)
- {
- return FilteringPartitionIterator.this.includePartitionDeletion(dt);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/PartitionStatisticsCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionStatisticsCollector.java b/src/java/org/apache/cassandra/db/partitions/PartitionStatisticsCollector.java
new file mode 100644
index 0000000..510b9c8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionStatisticsCollector.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.Cell;
+
+public interface PartitionStatisticsCollector
+{
+ public void update(LivenessInfo info);
+ public void update(DeletionTime deletionTime);
+ public void update(Cell cell);
+ public void updateColumnSetPerRow(long columnSetInRow);
+ public void updateHasLegacyCounterShards(boolean hasLegacyCounterShards);
+}