You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2016/11/18 10:56:22 UTC
[12/13] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.X
Merge branch 'cassandra-3.0' into cassandra-3.X
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f33cd55a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f33cd55a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f33cd55a
Branch: refs/heads/trunk
Commit: f33cd55a5bbf9a8ba0073c606b971d3b3fc85471
Parents: 490c1c2 eb41380
Author: Branimir Lambov <br...@datastax.com>
Authored: Fri Nov 18 12:43:04 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Fri Nov 18 12:44:09 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/cassandra/db/ReadCommand.java | 5 +-
.../db/compaction/CompactionController.java | 46 +++++--
.../db/compaction/CompactionIterator.java | 22 +--
.../db/compaction/CompactionManager.java | 5 +-
.../db/compaction/SSTableSplitter.java | 5 +-
.../cassandra/db/compaction/Upgrader.java | 5 +-
.../cassandra/db/compaction/Verifier.java | 5 +-
.../cassandra/db/partitions/PurgeFunction.java | 6 +-
.../db/compaction/CompactionControllerTest.java | 21 ++-
.../db/compaction/CompactionsPurgeTest.java | 138 ++++++++++++++++++-
11 files changed, 213 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 6ca26f9,8a3ac65..ee73b81
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -150,6 -37,7 +150,8 @@@ Merged from 3.0
* Correct log message for statistics of offheap memtable flush (CASSANDRA-12776)
* Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545)
Merged from 2.2:
++=======
+ * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792)
* Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
* cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)
* Fix Util.spinAssertEquals (CASSANDRA-12283)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java
index b34eee6,34d093e..64c35d9
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@@ -18,13 -18,10 +18,14 @@@
package org.apache.cassandra.db.compaction;
import java.util.*;
+ import java.util.function.Predicate;
import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+
+import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.RateLimiter;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.io.sstable.format.SSTableReader;
@@@ -213,20 -194,24 +214,24 @@@ public class CompactionController imple
}
/**
- * @return the largest timestamp before which it's okay to drop tombstones for the given partition;
- * i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be suppressed
- * in other sstables. This returns the minimum timestamp for any SSTable that contains this partition and is not
- * participating in this compaction, or memtable that contains this partition,
- * or LONG.MAX_VALUE if no SSTable or memtable exist.
+ * @param key
+ * @return a predicate for whether tombstones marked for deletion at the given time for the given partition are
+ * purgeable; we calculate this by checking whether the deletion time is less than the min timestamp of all SSTables
+ * containing his partition and not participating in the compaction. This means there isn't any data in those
+ * sstables that might still need to be suppressed by a tombstone at this timestamp.
*/
- public long maxPurgeableTimestamp(DecoratedKey key)
+ public Predicate<Long> getPurgeEvaluator(DecoratedKey key)
{
- if (!compactingRepaired() || NEVER_PURGE_TOMBSTONES)
+ if (NEVER_PURGE_TOMBSTONES || !compactingRepaired())
- return Long.MIN_VALUE;
+ return time -> false;
- long min = Long.MAX_VALUE;
overlapIterator.update(key);
- for (SSTableReader sstable : overlapIterator.overlaps())
+ Set<SSTableReader> filteredSSTables = overlapIterator.overlaps();
+ Iterable<Memtable> memtables = cfs.getTracker().getView().getAllMemtables();
+ long minTimestampSeen = Long.MAX_VALUE;
+ boolean hasTimestamp = false;
+
- for (SSTableReader sstable : filteredSSTables)
++ for (SSTableReader sstable: filteredSSTables)
{
// if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing),
// we check index file instead.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index fd1393c,9f0984f..4693794
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@@ -17,9 -17,12 +17,10 @@@
*/
package org.apache.cassandra.db.compaction;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
+ import java.util.function.Predicate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.collect.Ordering;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
@@@ -299,251 -292,18 +299,251 @@@ public class CompactionIterator extend
}
/*
- * Tombstones with a localDeletionTime before this can be purged. This is the minimum timestamp for any sstable
- * containing `currentKey` outside of the set of sstables involved in this compaction. This is computed lazily
- * on demand as we only need this if there is tombstones and this a bit expensive (see #8914).
+ * Evaluates whether a tombstone with the given deletion timestamp can be purged. This is the minimum
+ * timestamp for any sstable containing `currentKey` outside of the set of sstables involved in this compaction.
+ * This is computed lazily on demand as we only need this if there is tombstones and this a bit expensive
+ * (see #8914).
*/
- protected long getMaxPurgeableTimestamp()
+ protected Predicate<Long> getPurgeEvaluator()
{
- if (!hasCalculatedMaxPurgeableTimestamp)
+ if (purgeEvaluator == null)
{
- hasCalculatedMaxPurgeableTimestamp = true;
- maxPurgeableTimestamp = controller.maxPurgeableTimestamp(currentKey);
+ purgeEvaluator = controller.getPurgeEvaluator(currentKey);
}
- return maxPurgeableTimestamp;
+ return purgeEvaluator;
}
}
+
+ /**
+ * Unfiltered row iterator that removes deleted data as provided by a "tombstone source" for the partition.
+ * The result produced by this iterator is such that when merged with tombSource it produces the same output
+ * as the merge of dataSource and tombSource.
+ */
+ private static class GarbageSkippingUnfilteredRowIterator extends WrappingUnfilteredRowIterator
+ {
+ final UnfilteredRowIterator tombSource;
+ final DeletionTime partitionLevelDeletion;
+ final Row staticRow;
+ final ColumnFilter cf;
+ final int nowInSec;
+ final CFMetaData metadata;
+ final boolean cellLevelGC;
+
+ DeletionTime tombOpenDeletionTime = DeletionTime.LIVE;
+ DeletionTime dataOpenDeletionTime = DeletionTime.LIVE;
+ DeletionTime openDeletionTime = DeletionTime.LIVE;
+ DeletionTime partitionDeletionTime;
+ DeletionTime activeDeletionTime;
+ Unfiltered tombNext = null;
+ Unfiltered dataNext = null;
+ Unfiltered next = null;
+
+ /**
+ * Construct an iterator that filters out data shadowed by the provided "tombstone source".
+ *
+ * @param dataSource The input row. The result is a filtered version of this.
+ * @param tombSource Tombstone source, i.e. iterator used to identify deleted data in the input row.
+ * @param nowInSec Current time, used in choosing the winner when cell expiration is involved.
+ * @param cellLevelGC If false, the iterator will only look at row-level deletion times and tombstones.
+ * If true, deleted or overwritten cells within a surviving row will also be removed.
+ */
+ protected GarbageSkippingUnfilteredRowIterator(UnfilteredRowIterator dataSource, UnfilteredRowIterator tombSource, int nowInSec, boolean cellLevelGC)
+ {
+ super(dataSource);
+ this.tombSource = tombSource;
+ this.nowInSec = nowInSec;
+ this.cellLevelGC = cellLevelGC;
+ metadata = dataSource.metadata();
+ cf = ColumnFilter.all(metadata);
+
+ activeDeletionTime = partitionDeletionTime = tombSource.partitionLevelDeletion();
+
+ // Only preserve partition level deletion if not shadowed. (Note: Shadowing deletion must not be copied.)
+ this.partitionLevelDeletion = dataSource.partitionLevelDeletion().supersedes(tombSource.partitionLevelDeletion()) ?
+ dataSource.partitionLevelDeletion() :
+ DeletionTime.LIVE;
+
+ Row dataStaticRow = garbageFilterRow(dataSource.staticRow(), tombSource.staticRow());
+ this.staticRow = dataStaticRow != null ? dataStaticRow : Rows.EMPTY_STATIC_ROW;
+
+ tombNext = advance(tombSource);
+ dataNext = advance(dataSource);
+ }
+
+ private static Unfiltered advance(UnfilteredRowIterator source)
+ {
+ return source.hasNext() ? source.next() : null;
+ }
+
+ @Override
+ public DeletionTime partitionLevelDeletion()
+ {
+ return partitionLevelDeletion;
+ }
+
+ public void close()
+ {
+ super.close();
+ tombSource.close();
+ }
+
+ @Override
+ public Row staticRow()
+ {
+ return staticRow;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ // Produce the next element. This may consume multiple elements from both inputs until we find something
+ // from dataSource that is still live. We track the currently open deletion in both sources, as well as the
+ // one we have last issued to the output. The tombOpenDeletionTime is used to filter out content; the others
+ // to decide whether or not a tombstone is superseded, and to be able to surface (the rest of) a deletion
+ // range from the input when a suppressing deletion ends.
+ while (next == null && dataNext != null)
+ {
+ int cmp = tombNext == null ? -1 : metadata.comparator.compare(dataNext, tombNext);
+ if (cmp < 0)
+ {
+ if (dataNext.isRow())
+ next = ((Row) dataNext).filter(cf, activeDeletionTime, false, metadata);
+ else
+ next = processDataMarker();
+ }
+ else if (cmp == 0)
+ {
+ if (dataNext.isRow())
+ {
+ next = garbageFilterRow((Row) dataNext, (Row) tombNext);
+ }
+ else
+ {
+ tombOpenDeletionTime = updateOpenDeletionTime(tombOpenDeletionTime, tombNext);
+ activeDeletionTime = Ordering.natural().max(partitionDeletionTime,
+ tombOpenDeletionTime);
+ next = processDataMarker();
+ }
+ }
+ else // (cmp > 0)
+ {
+ if (tombNext.isRangeTombstoneMarker())
+ {
+ tombOpenDeletionTime = updateOpenDeletionTime(tombOpenDeletionTime, tombNext);
+ activeDeletionTime = Ordering.natural().max(partitionDeletionTime,
+ tombOpenDeletionTime);
+ boolean supersededBefore = openDeletionTime.isLive();
+ boolean supersededAfter = !dataOpenDeletionTime.supersedes(activeDeletionTime);
+ // If a range open was not issued because it was superseded and the deletion isn't superseded any more, we need to open it now.
+ if (supersededBefore && !supersededAfter)
+ next = new RangeTombstoneBoundMarker(((RangeTombstoneMarker) tombNext).closeBound(false).invert(), dataOpenDeletionTime);
+ // If the deletion begins to be superseded, we don't close the range yet. This can save us a close/open pair if it ends after the superseding range.
+ }
+ }
+
+ if (next instanceof RangeTombstoneMarker)
+ openDeletionTime = updateOpenDeletionTime(openDeletionTime, next);
+
+ if (cmp <= 0)
+ dataNext = advance(wrapped);
+ if (cmp >= 0)
+ tombNext = advance(tombSource);
+ }
+ return next != null;
+ }
+
+ protected Row garbageFilterRow(Row dataRow, Row tombRow)
+ {
+ if (cellLevelGC)
+ {
+ return Rows.removeShadowedCells(dataRow, tombRow, activeDeletionTime, nowInSec);
+ }
+ else
+ {
+ DeletionTime deletion = Ordering.natural().max(tombRow.deletion().time(),
+ activeDeletionTime);
+ return dataRow.filter(cf, deletion, false, metadata);
+ }
+ }
+
+ /**
+ * Decide how to act on a tombstone marker from the input iterator. We can decide what to issue depending on
+ * whether or not the ranges before and after the marker are superseded/live -- if none are, we can reuse the
+ * marker; if both are, the marker can be ignored; otherwise we issue a corresponding start/end marker.
+ */
+ private RangeTombstoneMarker processDataMarker()
+ {
+ dataOpenDeletionTime = updateOpenDeletionTime(dataOpenDeletionTime, dataNext);
+ boolean supersededBefore = openDeletionTime.isLive();
+ boolean supersededAfter = !dataOpenDeletionTime.supersedes(activeDeletionTime);
+ RangeTombstoneMarker marker = (RangeTombstoneMarker) dataNext;
+ if (!supersededBefore)
+ if (!supersededAfter)
+ return marker;
+ else
+ return new RangeTombstoneBoundMarker(marker.closeBound(false), marker.closeDeletionTime(false));
+ else
+ if (!supersededAfter)
+ return new RangeTombstoneBoundMarker(marker.openBound(false), marker.openDeletionTime(false));
+ else
+ return null;
+ }
+
+ @Override
+ public Unfiltered next()
+ {
+ if (!hasNext())
+ throw new IllegalStateException();
+
+ Unfiltered v = next;
+ next = null;
+ return v;
+ }
+
+ private DeletionTime updateOpenDeletionTime(DeletionTime openDeletionTime, Unfiltered next)
+ {
+ RangeTombstoneMarker marker = (RangeTombstoneMarker) next;
+ assert openDeletionTime.isLive() == !marker.isClose(false);
+ assert openDeletionTime.isLive() || openDeletionTime.equals(marker.closeDeletionTime(false));
+ return marker.isOpen(false) ? marker.openDeletionTime(false) : DeletionTime.LIVE;
+ }
+ }
+
+ /**
+ * Partition transformation applying GarbageSkippingUnfilteredRowIterator, obtaining tombstone sources for each
+ * partition using the controller's shadowSources method.
+ */
+ private static class GarbageSkipper extends Transformation<UnfilteredRowIterator>
+ {
+ final int nowInSec;
+ final CompactionController controller;
+ final boolean cellLevelGC;
+
+ private GarbageSkipper(CompactionController controller, int nowInSec)
+ {
+ this.controller = controller;
+ this.nowInSec = nowInSec;
+ cellLevelGC = controller.tombstoneOption == TombstoneOption.CELL;
+ }
+
+ @Override
+ protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+ {
+ Iterable<UnfilteredRowIterator> sources = controller.shadowSources(partition.partitionKey(), !cellLevelGC);
+ if (sources == null)
+ return partition;
+ List<UnfilteredRowIterator> iters = new ArrayList<>();
+ for (UnfilteredRowIterator iter : sources)
+ {
+ if (!iter.isEmpty())
+ iters.add(iter);
+ else
+ iter.close();
+ }
+ if (iters.isEmpty())
+ return partition;
+
+ return new GarbageSkippingUnfilteredRowIterator(partition, UnfilteredRowIterators.merge(iters, nowInSec), nowInSec, cellLevelGC);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index a0dc8c9,a77cefb..bc72fd8
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -22,7 -22,7 +22,8 @@@ import java.io.IOException
import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.*;
+ import java.util.function.Predicate;
+import java.util.stream.Collectors;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.openmbean.OpenDataException;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/src/java/org/apache/cassandra/db/compaction/Verifier.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f33cd55a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------