You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/08/11 08:31:46 UTC
[1/2] cassandra git commit: Add option to only purge tombstones from
repaired sstables
Repository: cassandra
Updated Branches:
refs/heads/trunk e8ac7edb7 -> b1c739874
Add option to only purge tombstones from repaired sstables
Patch by marcuse; reviewed by yukim for CASSANDRA-6434
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6f0c12f3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6f0c12f3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6f0c12f3
Branch: refs/heads/trunk
Commit: 6f0c12f3a4668a5dcae162969843f02498ee7e6d
Parents: 76ca697
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Jul 7 07:50:11 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Aug 11 08:25:55 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 6 +
pylib/cqlshlib/cql3handling.py | 2 +-
src/java/org/apache/cassandra/db/Memtable.java | 93 ++++--
.../cassandra/db/PartitionRangeReadCommand.java | 13 +-
.../org/apache/cassandra/db/ReadCommand.java | 4 +-
.../db/SinglePartitionNamesCommand.java | 16 +-
.../db/SinglePartitionSliceCommand.java | 15 +-
.../compaction/AbstractCompactionStrategy.java | 4 +-
.../db/compaction/CompactionController.java | 20 +-
.../db/compaction/CompactionIterator.java | 2 +-
.../compaction/CompactionStrategyManager.java | 5 +
.../db/partitions/PurgingPartitionIterator.java | 5 +-
.../db/RepairedDataTombstonesTest.java | 292 +++++++++++++++++++
14 files changed, 436 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b23c2d1..b882c23 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.0-beta1
+ * Add option to only purge repaired tombstones (CASSANDRA-6434)
* Change authorization handling for MVs (CASSANDRA-9927)
* Add custom JMX enabled executor for UDF sandbox (CASSANDRA-10026)
* Fix row deletion bug for Materialized Views (CASSANDRA-10014)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 65df769..26fe902 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -23,6 +23,12 @@ New features
for non-primary key queries, and perform much better for indexing high
cardinality columns.
See http://www.datastax.com/dev/blog/new-in-cassandra-3-0-materialized-views
+ - Option to not purge unrepaired tombstones. To avoid users having data resurrected
+ if repair has not been run within gc_grace_seconds, an option has been added to
+ only allow tombstones from repaired sstables to be purged. To enable, set the
+ compaction option 'only_purge_repaired_tombstones':true but keep in mind that if
+ you do not run repair for a long time, you will keep all tombstones around which
+ can cause other problems.
Upgrading
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index a46da91..44a1e23 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -69,7 +69,7 @@ class Cql3ParsingRuleSet(CqlParsingRuleSet):
# (CQL3 option name, schema_columnfamilies column name (or None if same),
# list of known map keys)
('compaction', 'compaction_strategy_options',
- ('class', 'max_threshold', 'tombstone_compaction_interval', 'tombstone_threshold', 'enabled', 'unchecked_tombstone_compaction')),
+ ('class', 'max_threshold', 'tombstone_compaction_interval', 'tombstone_threshold', 'enabled', 'unchecked_tombstone_compaction', 'only_purge_repaired_tombstones')),
('compression', 'compression_parameters',
('sstable_compression', 'chunk_length_kb', 'crc_check_chance')),
('caching', None,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 2db0ce9..1b30fc7 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -260,7 +260,7 @@ public class Memtable implements Comparable<Memtable>
100 * allocator.onHeap().ownershipRatio(), 100 * allocator.offHeap().ownershipRatio());
}
- public UnfilteredPartitionIterator makePartitionIterator(final ColumnFilter columnFilter, final DataRange dataRange, final boolean isForThrift)
+ public MemtableUnfilteredPartitionIterator makePartitionIterator(final ColumnFilter columnFilter, final DataRange dataRange, final boolean isForThrift)
{
AbstractBounds<PartitionPosition> keyRange = dataRange.keyRange();
@@ -278,35 +278,26 @@ public class Memtable implements Comparable<Memtable>
? partitions.tailMap(keyRange.left, includeStart)
: partitions.subMap(keyRange.left, includeStart, keyRange.right, includeStop);
- final Iterator<Map.Entry<PartitionPosition, AtomicBTreePartition>> iter = subMap.entrySet().iterator();
+ int minLocalDeletionTime = Integer.MAX_VALUE;
- return new AbstractUnfilteredPartitionIterator()
- {
- public boolean isForThrift()
- {
- return isForThrift;
- }
+ // avoid iterating over the memtable if we purge all tombstones
+ if (cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones())
+ minLocalDeletionTime = findMinLocalDeletionTime(subMap.entrySet().iterator());
- public CFMetaData metadata()
- {
- return cfs.metadata;
- }
+ final Iterator<Map.Entry<PartitionPosition, AtomicBTreePartition>> iter = subMap.entrySet().iterator();
- public boolean hasNext()
- {
- return iter.hasNext();
- }
+ return new MemtableUnfilteredPartitionIterator(cfs, iter, isForThrift, minLocalDeletionTime, columnFilter, dataRange);
+ }
- public UnfilteredRowIterator next()
- {
- Map.Entry<PartitionPosition, AtomicBTreePartition> entry = iter.next();
- // Actual stored key should be true DecoratedKey
- assert entry.getKey() instanceof DecoratedKey;
- DecoratedKey key = (DecoratedKey)entry.getKey();
- ClusteringIndexFilter filter = dataRange.clusteringIndexFilter(key);
- return filter.getUnfilteredRowIterator(columnFilter, entry.getValue());
- }
- };
+ private int findMinLocalDeletionTime(Iterator<Map.Entry<PartitionPosition, AtomicBTreePartition>> iterator)
+ {
+ int minLocalDeletionTime = Integer.MAX_VALUE;
+ while (iterator.hasNext())
+ {
+ Map.Entry<PartitionPosition, AtomicBTreePartition> entry = iterator.next();
+ minLocalDeletionTime = Math.min(minLocalDeletionTime, entry.getValue().stats().minLocalDeletionTime);
+ }
+ return minLocalDeletionTime;
}
public Partition getPartition(DecoratedKey key)
@@ -463,6 +454,56 @@ public class Memtable implements Comparable<Memtable>
}
}
+ public static class MemtableUnfilteredPartitionIterator extends AbstractUnfilteredPartitionIterator
+ {
+ private final ColumnFamilyStore cfs;
+ private final Iterator<Map.Entry<PartitionPosition, AtomicBTreePartition>> iter;
+ private final boolean isForThrift;
+ private final int minLocalDeletionTime;
+ private final ColumnFilter columnFilter;
+ private final DataRange dataRange;
+
+ public MemtableUnfilteredPartitionIterator(ColumnFamilyStore cfs, Iterator<Map.Entry<PartitionPosition, AtomicBTreePartition>> iter, boolean isForThrift, int minLocalDeletionTime, ColumnFilter columnFilter, DataRange dataRange)
+ {
+ this.cfs = cfs;
+ this.iter = iter;
+ this.isForThrift = isForThrift;
+ this.minLocalDeletionTime = minLocalDeletionTime;
+ this.columnFilter = columnFilter;
+ this.dataRange = dataRange;
+ }
+
+ public boolean isForThrift()
+ {
+ return isForThrift;
+ }
+
+ public int getMinLocalDeletionTime()
+ {
+ return minLocalDeletionTime;
+ }
+
+ public CFMetaData metadata()
+ {
+ return cfs.metadata;
+ }
+
+ public boolean hasNext()
+ {
+ return iter.hasNext();
+ }
+
+ public UnfilteredRowIterator next()
+ {
+ Map.Entry<PartitionPosition, AtomicBTreePartition> entry = iter.next();
+ // Actual stored key should be true DecoratedKey
+ assert entry.getKey() instanceof DecoratedKey;
+ DecoratedKey key = (DecoratedKey)entry.getKey();
+ ClusteringIndexFilter filter = dataRange.clusteringIndexFilter(key);
+ return filter.getUnfilteredRowIterator(columnFilter, entry.getValue());
+ }
+ }
+
private static class ColumnsCollector
{
private final HashMap<ColumnDefinition, AtomicBoolean> predefined = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 2219a84..e7288cc 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -53,6 +53,7 @@ public class PartitionRangeReadCommand extends ReadCommand
protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
private final DataRange dataRange;
+ private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
public PartitionRangeReadCommand(boolean isDigest,
boolean isForThrift,
@@ -172,7 +173,8 @@ public class PartitionRangeReadCommand extends ReadCommand
for (Memtable memtable : view.memtables)
{
@SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
- UnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(), dataRange(), isForThrift());
+ Memtable.MemtableUnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(), dataRange(), isForThrift());
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, iter.getMinLocalDeletionTime());
iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter);
}
@@ -181,8 +183,9 @@ public class PartitionRangeReadCommand extends ReadCommand
@SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
UnfilteredPartitionIterator iter = sstable.getScanner(columnFilter(), dataRange(), isForThrift());
iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter);
+ if (!sstable.isRepaired())
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
}
-
return checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(iterators, nowInSec()), cfs);
}
catch (RuntimeException | Error e)
@@ -200,6 +203,12 @@ public class PartitionRangeReadCommand extends ReadCommand
}
}
+ @Override
+ protected int oldestUnrepairedTombstone()
+ {
+ return oldestUnrepairedTombstone;
+ }
+
private UnfilteredPartitionIterator checkCacheFilter(UnfilteredPartitionIterator iter, final ColumnFamilyStore cfs)
{
return new WrappingUnfilteredPartitionIterator(iter)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 913a1de..5c40492 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -249,6 +249,8 @@ public abstract class ReadCommand implements ReadQuery
protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadOrderGroup orderGroup);
+ protected abstract int oldestUnrepairedTombstone();
+
public ReadResponse createResponse(UnfilteredPartitionIterator iterator)
{
return isDigestQuery()
@@ -426,7 +428,7 @@ public abstract class ReadCommand implements ReadQuery
// are to some extend an artefact of compaction lagging behind and hence counting them is somewhat unintuitive).
protected UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, ColumnFamilyStore cfs)
{
- return new PurgingPartitionIterator(iterator, cfs.gcBefore(nowInSec()))
+ return new PurgingPartitionIterator(iterator, cfs.gcBefore(nowInSec()), oldestUnrepairedTombstone(), cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones())
{
protected long getMaxPurgeableTimestamp()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
index b0958fc..518e299 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.utils.memory.HeapAllocator;
*/
public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<ClusteringIndexNamesFilter>
{
+ private int oldestUnrepairedDeletionTime = Integer.MAX_VALUE;
protected SinglePartitionNamesCommand(boolean isDigest,
boolean isForThrift,
CFMetaData metadata,
@@ -84,6 +85,12 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus
return new SinglePartitionNamesCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter());
}
+ @Override
+ protected int oldestUnrepairedTombstone()
+ {
+ return oldestUnrepairedDeletionTime;
+ }
+
protected UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap)
{
Tracing.trace("Acquiring sstable references");
@@ -107,7 +114,7 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus
UnfilteredRowIterator clonedFilter = copyOnHeap
? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance)
: iter;
- result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(clonedFilter, nowInSec()) : clonedFilter, result);
+ result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(clonedFilter, nowInSec()) : clonedFilter, result, false);
}
}
@@ -137,7 +144,7 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus
continue;
sstablesIterated++;
- result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter, result);
+ result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter, result, sstable.isRepaired());
}
}
@@ -175,8 +182,11 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus
return result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed());
}
- private ArrayBackedPartition add(UnfilteredRowIterator iter, ArrayBackedPartition result)
+ private ArrayBackedPartition add(UnfilteredRowIterator iter, ArrayBackedPartition result, boolean isRepaired)
{
+ if (!isRepaired)
+ oldestUnrepairedDeletionTime = Math.min(oldestUnrepairedDeletionTime, iter.stats().minLocalDeletionTime);
+
int maxRows = Math.max(clusteringIndexFilter().requestedRows().size(), 1);
if (result == null)
return ArrayBackedPartition.create(iter, maxRows);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
index bb9a35e..2dbf7b1 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
@@ -41,6 +41,8 @@ import org.apache.cassandra.utils.memory.HeapAllocator;
*/
public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<ClusteringIndexSliceFilter>
{
+ private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
+
public SinglePartitionSliceCommand(boolean isDigest,
boolean isForThrift,
CFMetaData metadata,
@@ -119,6 +121,12 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus
return new SinglePartitionSliceCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter());
}
+ @Override
+ protected int oldestUnrepairedTombstone()
+ {
+ return oldestUnrepairedTombstone;
+ }
+
protected UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap)
{
Tracing.trace("Acquiring sstable references");
@@ -139,9 +147,9 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus
UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition);
@SuppressWarnings("resource") // same as above
UnfilteredRowIterator maybeCopied = copyOnHeap ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance) : iter;
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime);
iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(maybeCopied, nowInSec()) : maybeCopied);
}
-
/*
* We can't eliminate full sstables based on the timestamp of what we've already read like
* in collectTimeOrderedData, but we still want to eliminate sstable whose maxTimestamp < mostRecentTombstone
@@ -185,6 +193,9 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus
sstable.incrementReadCount();
@SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()));
+ if (!sstable.isRepaired())
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
+
iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter);
mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, iter.partitionLevelDeletion().markedForDeleteAt());
sstablesIterated++;
@@ -205,6 +216,8 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus
if (iter.partitionLevelDeletion().markedForDeleteAt() > minTimestamp)
{
iterators.add(iter);
+ if (!sstable.isRepaired())
+ oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
includedDueToTombstones++;
sstablesIterated++;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 4279f6e..d9c9ea3 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -26,7 +26,6 @@ import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +38,6 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.ISSTableScanner;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
/**
@@ -64,6 +62,7 @@ public abstract class AbstractCompactionStrategy
// disable range overlap check when deciding if an SSTable is candidate for tombstone compaction (CASSANDRA-6563)
protected static final String UNCHECKED_TOMBSTONE_COMPACTION_OPTION = "unchecked_tombstone_compaction";
protected static final String COMPACTION_ENABLED = "enabled";
+ public static final String ONLY_PURGE_REPAIRED_TOMBSTONES = "only_purge_repaired_tombstones";
protected Map<String, String> options;
@@ -453,6 +452,7 @@ public abstract class AbstractCompactionStrategy
uncheckedOptions.remove(TOMBSTONE_COMPACTION_INTERVAL_OPTION);
uncheckedOptions.remove(UNCHECKED_TOMBSTONE_COMPACTION_OPTION);
uncheckedOptions.remove(COMPACTION_ENABLED);
+ uncheckedOptions.remove(ONLY_PURGE_REPAIRED_TOMBSTONES);
return uncheckedOptions;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 1e91dca..179d12d 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -20,14 +20,13 @@ package org.apache.cassandra.db.compaction;
import java.util.*;
import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.lifecycle.View;
+import com.google.common.collect.Iterables;
+
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
-import org.apache.cassandra.db.lifecycle.Tracker;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.utils.AlwaysPresentFilter;
@@ -45,6 +44,7 @@ public class CompactionController implements AutoCloseable
private static final Logger logger = LoggerFactory.getLogger(CompactionController.class);
public final ColumnFamilyStore cfs;
+ private final boolean compactingRepaired;
private Refs<SSTableReader> overlappingSSTables;
private OverlapIterator<PartitionPosition, SSTableReader> overlapIterator;
private final Iterable<SSTableReader> compacting;
@@ -56,12 +56,13 @@ public class CompactionController implements AutoCloseable
this(cfs, null, maxValue);
}
- public CompactionController(ColumnFamilyStore cfs, Set<SSTableReader> compacting, int gcBefore)
+ public CompactionController(ColumnFamilyStore cfs, Set<SSTableReader> compacting, int gcBefore)
{
assert cfs != null;
this.cfs = cfs;
this.gcBefore = gcBefore;
this.compacting = compacting;
+ compactingRepaired = compacting != null && compacting.stream().allMatch(SSTableReader::isRepaired);
refreshOverlaps();
}
@@ -117,6 +118,9 @@ public class CompactionController implements AutoCloseable
if (compacting == null)
return Collections.<SSTableReader>emptySet();
+ if (cfStore.getCompactionStrategyManager().onlyPurgeRepairedTombstones() && !Iterables.all(compacting, SSTableReader::isRepaired))
+ return Collections.emptySet();
+
List<SSTableReader> candidates = new ArrayList<>();
long minTimestamp = Long.MAX_VALUE;
@@ -177,6 +181,9 @@ public class CompactionController implements AutoCloseable
*/
public long maxPurgeableTimestamp(DecoratedKey key)
{
+ if (!compactingRepaired())
+ return Long.MIN_VALUE;
+
long min = Long.MAX_VALUE;
overlapIterator.update(key);
for (SSTableReader sstable : overlapIterator.overlaps())
@@ -201,4 +208,9 @@ public class CompactionController implements AutoCloseable
overlappingSSTables.release();
}
+ public boolean compactingRepaired()
+ {
+ return !cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones() || compactingRepaired;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index a1a9d25..cab96fb 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -252,7 +252,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
private PurgeIterator(UnfilteredPartitionIterator toPurge, CompactionController controller)
{
- super(toPurge, controller.gcBefore);
+ super(toPurge, controller.gcBefore, controller.compactingRepaired() ? Integer.MIN_VALUE : Integer.MAX_VALUE, controller.cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones());
this.controller = controller;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 7204da0..f5097af 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -479,4 +479,9 @@ public class CompactionStrategyManager implements INotificationConsumer
{
return params;
}
+
+ public boolean onlyPurgeRepairedTombstones()
+ {
+ return Boolean.parseBoolean(params.options().get(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES));
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java
index 492fe1d..e53e17b 100644
--- a/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java
+++ b/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java
@@ -27,7 +27,7 @@ public abstract class PurgingPartitionIterator extends WrappingUnfilteredPartiti
private UnfilteredRowIterator next;
- public PurgingPartitionIterator(UnfilteredPartitionIterator iterator, int gcBefore)
+ public PurgingPartitionIterator(UnfilteredPartitionIterator iterator, int gcBefore, int oldestUnrepairedTombstone, boolean onlyPurgeRepairedTombstones)
{
super(iterator);
this.gcBefore = gcBefore;
@@ -35,6 +35,9 @@ public abstract class PurgingPartitionIterator extends WrappingUnfilteredPartiti
{
public boolean shouldPurge(long timestamp, int localDeletionTime)
{
+ if (onlyPurgeRepairedTombstones && localDeletionTime >= oldestUnrepairedTombstone)
+ return false;
+
return timestamp < getMaxPurgeableTimestamp() && localDeletionTime < gcBefore;
}
};
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c12f3/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java b/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
new file mode 100644
index 0000000..3a74029
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import com.google.common.collect.Iterables;
+import org.junit.Test;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.AbstractRow;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class RepairedDataTombstonesTest extends CQLTester
+{
+ @Test
+ public void compactionTest() throws Throwable
+ {
+ createTable("create table %s (id int, id2 int, t text, primary key (id, id2)) with gc_grace_seconds=0 and compaction = {'class':'SizeTieredCompactionStrategy', 'only_purge_repaired_tombstones':true}");
+ // insert a live row to make sure that the sstables are not dropped (we test dropping in compactionDropExpiredSSTableTest() below)
+ execute("insert into %s (id, id2, t) values (999,999,'live')");
+ for (int i = 0; i < 10; i++)
+ {
+ execute("delete from %s where id=? and id2=?", 1, i);
+ }
+ flush();
+ SSTableReader repairedSSTable = getCurrentColumnFamilyStore().getSSTables(SSTableSet.LIVE).iterator().next();
+ repair(getCurrentColumnFamilyStore(), repairedSSTable);
+ Thread.sleep(2000);
+ execute("insert into %s (id, id2, t) values (999,999,'live')");
+ for (int i = 10; i < 20; i++)
+ {
+ execute("delete from %s where id=? and id2=?", 1, i);
+ }
+ flush();
+ Thread.sleep(1000);
+ // at this point we have 2 sstables, one repaired and one unrepaired. Both sstables contain expired tombstones, but we should only drop the tombstones from the repaired sstable.
+ getCurrentColumnFamilyStore().forceMajorCompaction();
+ verify();
+ verify2(1);
+ assertEquals(2, Iterables.size(getCurrentColumnFamilyStore().getSSTables(SSTableSet.LIVE)));
+
+ }
+
+ @Test
+ public void compactionDropExpiredSSTableTest() throws Throwable
+ {
+ createTable("create table %s (id int, id2 int, t text, primary key (id, id2)) with gc_grace_seconds=0 and compaction = {'class':'SizeTieredCompactionStrategy', 'only_purge_repaired_tombstones':true}");
+ for (int i = 0; i < 10; i++)
+ {
+ execute("delete from %s where id=? and id2=?", 1, i);
+ }
+ flush();
+ SSTableReader repairedSSTable = getCurrentColumnFamilyStore().getSSTables(SSTableSet.LIVE).iterator().next();
+ repair(getCurrentColumnFamilyStore(), repairedSSTable);
+ Thread.sleep(2000);
+ for (int i = 10; i < 20; i++)
+ {
+ execute("delete from %s where id=? and id2=?", 1, i);
+ }
+ flush();
+ Thread.sleep(1000);
+ getCurrentColumnFamilyStore().forceMajorCompaction();
+ verify();
+ verify2(1);
+ assertEquals(1, Iterables.size(getCurrentColumnFamilyStore().getSSTables(SSTableSet.LIVE)));
+ assertFalse(getCurrentColumnFamilyStore().getSSTables(SSTableSet.LIVE).iterator().next().isRepaired());
+
+ }
+
+ @Test
+ public void readTest() throws Throwable
+ {
+ createTable("create table %s (id int, id2 int, t text, t2 text, primary key (id, id2)) with gc_grace_seconds=0 and compaction = {'class':'SizeTieredCompactionStrategy', 'only_purge_repaired_tombstones':true}");
+ for (int i = 0; i < 10; i++)
+ {
+ execute("update %s set t2=null where id=? and id2=?", 123, i);
+ }
+ flush();
+ SSTableReader repairedSSTable = getCurrentColumnFamilyStore().getSSTables(SSTableSet.LIVE).iterator().next();
+ repair(getCurrentColumnFamilyStore(), repairedSSTable);
+ Thread.sleep(2000);
+ for (int i = 10; i < 20; i++)
+ {
+ execute("update %s set t2=null where id=? and id2=?", 123, i);
+ }
+ flush();
+ // allow gcgrace to properly expire:
+ Thread.sleep(1000);
+ // make sure we only see the unrepaired tombstones, the other ones are expired and can be purged
+ verify();
+ verify2(123);
+ }
+
+ @Test
+ public void readOnlyUnrepairedTest() throws Throwable
+ {
+ // make sure we keep all tombstones if we only have unrepaired data
+ createTable("create table %s (id int, id2 int, t text, t2 text, primary key (id, id2)) with gc_grace_seconds=0 and compaction = {'class':'SizeTieredCompactionStrategy', 'only_purge_repaired_tombstones':true}");
+ for (int i = 10; i < 20; i++)
+ {
+ execute("update %s set t2=null where id=? and id2=?", 123, i);
+ }
+ flush();
+
+ // allow gcgrace to properly expire:
+ Thread.sleep(1000);
+ verify();
+ verify2(123);
+ }
+
+
+ @Test
+ public void readTestRowTombstones() throws Throwable
+ {
+ createTable("create table %s (id int, id2 int, t text, t2 text, primary key (id, id2)) with gc_grace_seconds=0 and compaction = {'class':'SizeTieredCompactionStrategy', 'only_purge_repaired_tombstones':true}");
+ for (int i = 0; i < 10; i++)
+ {
+ execute("delete from %s where id=? and id2=?", 1, i);
+ }
+ flush();
+ SSTableReader repairedSSTable = getCurrentColumnFamilyStore().getSSTables(SSTableSet.LIVE).iterator().next();
+ repair(getCurrentColumnFamilyStore(), repairedSSTable);
+ Thread.sleep(2000);
+ for (int i = 10; i < 20; i++)
+ {
+ execute("delete from %s where id=? and id2=?", 1, i);
+ }
+ flush();
+ Thread.sleep(1000);
+ verify();
+ verify2(1);
+ }
+
+ @Test
+ public void readTestPartitionTombstones() throws Throwable
+ {
+ createTable("create table %s (id int, id2 int, t text, t2 text, primary key (id, id2)) with gc_grace_seconds=0 and compaction = {'class':'SizeTieredCompactionStrategy', 'only_purge_repaired_tombstones':true}");
+ for (int i = 0; i < 10; i++)
+ {
+ execute("delete from %s where id=?", i);
+ }
+ flush();
+ SSTableReader repairedSSTable = getCurrentColumnFamilyStore().getSSTables(SSTableSet.LIVE).iterator().next();
+ repair(getCurrentColumnFamilyStore(), repairedSSTable);
+ Thread.sleep(2000);
+ for (int i = 10; i < 20; i++)
+ {
+ execute("delete from %s where id=?", i);
+ }
+ flush();
+
+ Thread.sleep(1000);
+ ReadCommand cmd = Util.cmd(getCurrentColumnFamilyStore()).build();
+ int partitionsFound = 0;
+ try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iterator = cmd.executeLocally(orderGroup))
+ {
+ while (iterator.hasNext())
+ {
+ partitionsFound++;
+ UnfilteredRowIterator rowIter = iterator.next();
+ int val = ByteBufferUtil.toInt(rowIter.partitionKey().getKey());
+ assertTrue("val=" + val, val >= 10 && val < 20);
+ }
+ }
+ assertEquals(10, partitionsFound);
+ }
+
+ @Test
+ public void readTestOldUnrepaired() throws Throwable
+ {
+ createTable("create table %s (id int, id2 int, t text, t2 text, primary key (id, id2)) with gc_grace_seconds=0 and compaction = {'class':'SizeTieredCompactionStrategy', 'only_purge_repaired_tombstones':true}");
+ getCurrentColumnFamilyStore().disableAutoCompaction();
+ for (int i = 0; i < 10; i++)
+ {
+ execute("delete from %s where id=1 and id2=?", i);
+ }
+ flush();
+ SSTableReader oldSSTable = getCurrentColumnFamilyStore().getLiveSSTables().iterator().next();
+ Thread.sleep(2000);
+ for (int i = 10; i < 20; i++)
+ {
+ execute("delete from %s where id=1 and id2=?", i);
+ }
+ flush();
+ for (SSTableReader sstable : getCurrentColumnFamilyStore().getLiveSSTables())
+ if (sstable != oldSSTable)
+ repair(getCurrentColumnFamilyStore(), sstable);
+ Thread.sleep(2000);
+ for (int i = 20; i < 30; i++)
+ {
+ execute("delete from %s where id=1 and id2=?", i);
+ }
+ flush();
+
+ Thread.sleep(2000);
+ // we will keep all tombstones since the oldest tombstones are unrepaired:
+ verify(30, 0, 30);
+ verify2(1, 30, 0, 30);
+ }
+
+ private void verify()
+ {
+ verify(10, 10, 20);
+ }
+
+ private void verify(int expectedRows, int minVal, int maxVal)
+ {
+ ReadCommand cmd = Util.cmd(getCurrentColumnFamilyStore()).build();
+ int foundRows = 0;
+ try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iterator = cmd.executeLocally(orderGroup))
+ {
+ while (iterator.hasNext())
+ {
+ UnfilteredRowIterator rowIter = iterator.next();
+ if (!rowIter.partitionKey().equals(Util.dk(ByteBufferUtil.bytes(999)))) // partition key 999 is 'live' and used to avoid sstables from being dropped
+ {
+ while (rowIter.hasNext())
+ {
+ AbstractRow row = (AbstractRow) rowIter.next();
+ for (int i = 0; i < row.clustering().size(); i++)
+ {
+ foundRows++;
+ int val = ByteBufferUtil.toInt(row.clustering().get(i));
+ assertTrue("val=" + val, val >= minVal && val < maxVal);
+ }
+ }
+ }
+ }
+ }
+ assertEquals(expectedRows, foundRows);
+ }
+ private void verify2(int key)
+ {
+ verify2(key, 10, 10, 20);
+ }
+
+ private void verify2(int key, int expectedRows, int minVal, int maxVal)
+ {
+ ReadCommand cmd = Util.cmd(getCurrentColumnFamilyStore(), Util.dk(ByteBufferUtil.bytes(key))).build();
+ int foundRows = 0;
+ try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iterator = cmd.executeLocally(orderGroup))
+ {
+ while (iterator.hasNext())
+ {
+ UnfilteredRowIterator rowIter = iterator.next();
+ while (rowIter.hasNext())
+ {
+ AbstractRow row = (AbstractRow) rowIter.next();
+ for (int i = 0; i < row.clustering().size(); i++)
+ {
+ foundRows++;
+ int val = ByteBufferUtil.toInt(row.clustering().get(i));
+ assertTrue("val=" + val, val >= minVal && val < maxVal);
+ }
+ }
+ }
+ }
+ assertEquals(expectedRows, foundRows);
+ }
+
+ public static void repair(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException
+ {
+ sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, 1);
+ sstable.reloadSSTableMetadata();
+ cfs.getTracker().notifySSTableRepairedStatusChanged(Collections.singleton(sstable));
+ }
+}
[2/2] cassandra git commit: Merge branch 'cassandra-3.0' into trunk
Posted by ma...@apache.org.
Merge branch 'cassandra-3.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b1c73987
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b1c73987
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b1c73987
Branch: refs/heads/trunk
Commit: b1c73987453c6c2c9bcc7b545764dce596a4b3e0
Parents: e8ac7ed 6f0c12f
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Aug 11 08:31:04 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Aug 11 08:31:04 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 6 +
pylib/cqlshlib/cql3handling.py | 2 +-
src/java/org/apache/cassandra/db/Memtable.java | 93 ++++--
.../cassandra/db/PartitionRangeReadCommand.java | 13 +-
.../org/apache/cassandra/db/ReadCommand.java | 4 +-
.../db/SinglePartitionNamesCommand.java | 16 +-
.../db/SinglePartitionSliceCommand.java | 15 +-
.../compaction/AbstractCompactionStrategy.java | 4 +-
.../db/compaction/CompactionController.java | 20 +-
.../db/compaction/CompactionIterator.java | 2 +-
.../compaction/CompactionStrategyManager.java | 5 +
.../db/partitions/PurgingPartitionIterator.java | 5 +-
.../db/RepairedDataTombstonesTest.java | 292 +++++++++++++++++++
14 files changed, 436 insertions(+), 42 deletions(-)
----------------------------------------------------------------------