You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/06/23 09:29:18 UTC
[8/9] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5ee6e7bc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5ee6e7bc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5ee6e7bc
Branch: refs/heads/trunk
Commit: 5ee6e7bc12a22744db462106abe1372a72b07a41
Parents: 458b36b 225232a
Author: Marcus Eriksson <ma...@apache.org>
Authored: Thu Jun 23 11:26:43 2016 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Thu Jun 23 11:26:43 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 32 ++--
.../repair/RepairMessageVerbHandler.java | 31 ++--
.../cassandra/service/ActiveRepairService.java | 175 +++++++++++++++++--
.../service/ActiveRepairServiceTest.java | 125 +++++++++++--
5 files changed, 305 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ee6e7bc/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ee2f6d3,b366d21..f7e854d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -6,35 -2,9 +6,36 @@@ Merged from 2.2
* Allow nodetool info to run with readonly JMX access (CASSANDRA-11755)
* Validate bloom_filter_fp_chance against lowest supported
value when the table is created (CASSANDRA-11920)
- * RandomAccessReader: call isEOF() only when rebuffering, not for every read operation (CASSANDRA-12013)
* Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038)
* StorageService shutdown hook should use a volatile variable (CASSANDRA-11984)
+Merged from 2.1:
++ * Avoid marking too many sstables as repaired (CASSANDRA-11696)
+ * Prevent select statements with clustering key > 64k (CASSANDRA-11882)
+ * Fix clock skew corrupting other nodes with paxos (CASSANDRA-11991)
+ * Remove distinction between non-existing static columns and existing but null in LWTs (CASSANDRA-9842)
+ * Cache local ranges when calculating repair neighbors (CASSANDRA-11934)
+ * Allow LWT operation on static column with only partition keys (CASSANDRA-10532)
+ * Create interval tree over canonical sstables to avoid missing sstables during streaming (CASSANDRA-11886)
+ * cqlsh COPY FROM: shutdown parent cluster after forking, to avoid corrupting SSL connections (CASSANDRA-11749)
+
+
+3.0.7
+ * Fix legacy serialization of Thrift-generated non-compound range tombstones
+ when communicating with 2.x nodes (CASSANDRA-11930)
+ * Fix Directories instantiations where CFS.initialDirectories should be used (CASSANDRA-11849)
+ * Avoid referencing DatabaseDescriptor in AbstractType (CASSANDRA-11912)
+ * Fix sstables not being protected from removal during index build (CASSANDRA-11905)
+ * cqlsh: Suppress stack trace from Read/WriteFailures (CASSANDRA-11032)
+ * Remove unneeded code to repair index summaries that have
+ been improperly down-sampled (CASSANDRA-11127)
+ * Avoid WriteTimeoutExceptions during commit log replay due to materialized
+ view lock contention (CASSANDRA-11891)
+ * Prevent OOM failures on SSTable corruption, improve tests for corruption detection (CASSANDRA-9530)
+ * Use CFS.initialDirectories when clearing snapshots (CASSANDRA-11705)
+ * Allow compaction strategies to disable early open (CASSANDRA-11754)
+ * Refactor Materialized View code (CASSANDRA-11475)
+ * Update Java Driver (CASSANDRA-11615)
+Merged from 2.2:
* Persist local metadata earlier in startup sequence (CASSANDRA-11742)
* Run CommitLog tests with different compression settings (CASSANDRA-9039)
* cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ee6e7bc/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index bf7bd81,cf82498..99e0fd5
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1059,10 -1068,16 +1059,17 @@@ public class CompactionManager implemen
try
{
- String snapshotName = validator.desc.sessionId.toString();
int gcBefore;
+ int nowInSec = FBUtilities.nowInSeconds();
+ UUID parentRepairSessionId = validator.desc.parentSessionId;
+ String snapshotName;
+ boolean isGlobalSnapshotValidation = cfs.snapshotExists(parentRepairSessionId.toString());
+ if (isGlobalSnapshotValidation)
+ snapshotName = parentRepairSessionId.toString();
+ else
+ snapshotName = validator.desc.sessionId.toString();
boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
+
if (isSnapshotValidation)
{
// If there is a snapshot created for the session then read from there.
@@@ -1083,40 -1098,53 +1090,42 @@@
StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
sstables = getSSTablesToValidate(cfs, validator);
if (sstables == null)
-- return; // this means the parent repair session was removed - the repair session failed on another node and we removed it
++ return; // this means the parent repair session was removed - the repair session failed on another node and we removed i
if (validator.gcBefore > 0)
gcBefore = validator.gcBefore;
else
- gcBefore = getDefaultGcBefore(cfs);
+ gcBefore = getDefaultGcBefore(cfs, nowInSec);
}
- // Create Merkle tree suitable to hold estimated partitions for given range.
- // We blindly assume that partition is evenly distributed on all sstables for now.
- long numPartitions = 0;
- for (SSTableReader sstable : sstables)
- {
- numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
- }
+ // Create Merkle trees suitable to hold estimated partitions for the given ranges.
+ // We blindly assume that a partition is evenly distributed on all sstables for now.
// determine tree depth from number of partitions, but cap at 20 to prevent large tree.
- int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
- MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
-
+ MerkleTrees tree = createMerkleTrees(sstables, validator.desc.ranges, cfs);
long start = System.nanoTime();
- try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range))
+ try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges);
+ ValidationCompactionController controller = new ValidationCompactionController(cfs, gcBefore);
+ CompactionIterator ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, metrics))
{
- CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore);
- Iterator<AbstractCompactedRow> iter = ci.iterator();
- metrics.beginCompaction(ci);
- try
+ // validate the CF as we iterate over it
+ validator.prepare(cfs, tree);
+ while (ci.hasNext())
{
- // validate the CF as we iterate over it
- validator.prepare(cfs, tree);
- while (iter.hasNext())
+ if (ci.isStopRequested())
+ throw new CompactionInterruptedException(ci.getCompactionInfo());
+ try (UnfilteredRowIterator partition = ci.next())
{
- if (ci.isStopRequested())
- throw new CompactionInterruptedException(ci.getCompactionInfo());
- AbstractCompactedRow row = iter.next();
- validator.add(row);
+ validator.add(partition);
}
- validator.complete();
}
- finally
+ validator.complete();
+ }
+ finally
+ {
- if (isSnapshotValidation)
++ if (isSnapshotValidation && !isGlobalSnapshotValidation)
{
+ // we can only clear the snapshot if we are not doing a global snapshot validation (we then clear it once anticompaction
+ // is done).
- if (isSnapshotValidation && !isGlobalSnapshotValidation)
- {
- cfs.clearSnapshot(snapshotName);
- }
-
- metrics.finishCompaction(ci);
+ cfs.clearSnapshot(snapshotName);
}
}
@@@ -1174,7 -1175,13 +1183,11 @@@
if (prs == null)
return null;
Set<SSTableReader> sstablesToValidate = new HashSet<>();
-
+ if (prs.isGlobal)
+ prs.markSSTablesRepairing(cfs.metadata.cfId, validator.desc.parentSessionId);
-
+ // note that we always grab all existing sstables for this - if we were to just grab the ones that
+ // were marked as repairing, we would miss any ranges that were compacted away and this would cause us to overstream
- try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES))
+ try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(View.select(SSTableSet.CANONICAL, (s) -> !prs.isIncremental || !s.isRepaired())))
{
for (SSTableReader sstable : sstableCandidates.sstables)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ee6e7bc/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index c536b13,1701e9a..edcb4f9
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@@ -90,27 -99,23 +90,22 @@@ public class RepairMessageVerbHandler i
desc.keyspace, desc.columnFamily), message.from, id);
return;
}
- final Collection<Range<Token>> repairingRange = desc.ranges;
- Set<SSTableReader> snapshottedSSSTables = cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>()
+ ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId);
+ if (prs.isGlobal)
{
- public boolean apply(SSTableReader sstable)
- {
- return sstable != null &&
- !sstable.metadata.isIndex() && // exclude SSTables from 2i
- new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(repairingRange);
- }
- }, true); //ephemeral snapshot, if repair fails, it will be cleaned next startup
- if (ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).isGlobal)
+ prs.maybeSnapshot(cfs.metadata.cfId, desc.parentSessionId);
+ }
+ else
{
- Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, desc.parentSessionId);
- if (!Sets.intersection(currentlyRepairing, snapshottedSSSTables).isEmpty())
- final Range<Token> repairingRange = desc.range;
+ cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>()
{
- // clear snapshot that we just created
- cfs.clearSnapshot(desc.sessionId.toString());
- logErrorAndSendFailureResponse("Cannot start multiple repair sessions over the same sstables", message.from, id);
- return;
- }
- ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).addSSTables(cfs.metadata.cfId, snapshottedSSSTables);
+ public boolean apply(SSTableReader sstable)
+ {
+ return sstable != null &&
- !(sstable.partitioner instanceof LocalPartitioner) && // exclude SSTables from 2i
- new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singleton(repairingRange));
++ !sstable.metadata.isIndex() && // exclude SSTables from 2i
++ new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(desc.ranges);
+ }
+ }, true); //ephemeral snapshot, if repair fails, it will be cleaned next startup
}
logger.debug("Enqueuing response to snapshot request {} to {}", desc.sessionId, message.from);
MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ee6e7bc/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index 6b1fd83,e111155..27c2424
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -36,9 -37,12 +37,11 @@@ import org.slf4j.Logger
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
--import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.CompactionManager;
++import org.apache.cassandra.db.lifecycle.SSTableSet;
++import org.apache.cassandra.db.lifecycle.View;
+ import org.apache.cassandra.dht.Bounds;
-import org.apache.cassandra.dht.LocalPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.ApplicationState;
@@@ -49,7 -53,7 +52,6 @@@ import org.apache.cassandra.gms.IFailur
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.IFailureDetectionEventListener;
import org.apache.cassandra.gms.VersionedValue;
--import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
@@@ -460,8 -488,12 +487,12 @@@ public class ActiveRepairService implem
public final boolean isGlobal;
public final long repairedAt;
public final InetAddress coordinator;
+ /**
+ * Indicates whether we have marked sstables as repairing. Can only be done once per table per ParentRepairSession
+ */
+ private final Set<UUID> marked = new HashSet<>();
- public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, boolean isGlobal, long repairedAt)
+ public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal)
{
this.coordinator = coordinator;
for (ColumnFamilyStore cfs : columnFamilyStores)
@@@ -471,15 -503,51 +502,51 @@@
}
this.ranges = ranges;
this.repairedAt = repairedAt;
- this.isGlobal = isGlobal;
this.isIncremental = isIncremental;
+ this.isGlobal = isGlobal;
}
+ /**
+ * Mark sstables repairing - either all sstables or only the unrepaired ones depending on
+ *
+ * whether this is an incremental or full repair
+ *
+ * @param cfId the column family
+ * @param parentSessionId the parent repair session id, used to make sure we don't start multiple repairs over the same sstables
+ */
+ public synchronized void markSSTablesRepairing(UUID cfId, UUID parentSessionId)
+ {
+ if (!marked.contains(cfId))
+ {
- List<SSTableReader> sstables = columnFamilyStores.get(cfId).select(isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES).sstables;
++ List<SSTableReader> sstables = columnFamilyStores.get(cfId).select(View.select(SSTableSet.CANONICAL, (s) -> !isIncremental || !s.isRepaired())).sstables;
+ Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfId, parentSessionId);
+ if (!Sets.intersection(currentlyRepairing, Sets.newHashSet(sstables)).isEmpty())
+ {
+ logger.error("Cannot start multiple repair sessions over the same sstables");
+ throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
+ }
+ addSSTables(cfId, sstables);
+ marked.add(cfId);
+ }
+ }
+
+ /**
+ * Get the still active sstables we should run anticompaction on
+ *
+ * note that validation and streaming do not call this method - they have to work on the actual active sstables on the node, we only call this
+ * to know which sstables are still there that were there when we started the repair
+ *
+ * @param cfId
+ * @param parentSessionId for checking if there exists a snapshot for this repair
+ * @return
+ */
@SuppressWarnings("resource")
- public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefs(UUID cfId)
+ public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefsForAntiCompaction(UUID cfId, UUID parentSessionId)
{
+ assert marked.contains(cfId);
+ boolean isSnapshotRepair = columnFamilyStores.get(cfId).snapshotExists(parentSessionId.toString());
ImmutableMap.Builder<SSTableReader, Ref<SSTableReader>> references = ImmutableMap.builder();
- for (SSTableReader sstable : getActiveSSTables(cfId))
+ for (SSTableReader sstable : isSnapshotRepair ? getSSTablesForSnapshotRepair(cfId, parentSessionId) : getActiveSSTables(cfId))
{
Ref<SSTableReader> ref = sstable.tryRef();
if (ref == null)
@@@ -490,12 -558,97 +557,97 @@@
return new Refs<>(references.build());
}
+ /**
+ * If we are running a snapshot repair we need to find the 'real' sstables when we start anticompaction
+ *
+ * We use the generation of the sstables as identifiers instead of the file name to avoid having to parse out the
+ * actual filename.
+ *
+ * @param cfId
+ * @param parentSessionId
+ * @return
+ */
+ private Set<SSTableReader> getSSTablesForSnapshotRepair(UUID cfId, UUID parentSessionId)
+ {
+ Set<SSTableReader> activeSSTables = new HashSet<>();
+ ColumnFamilyStore cfs = columnFamilyStores.get(cfId);
+
+ Set<Integer> snapshotGenerations = new HashSet<>();
+ try (Refs<SSTableReader> snapshottedSSTables = cfs.getSnapshotSSTableReader(parentSessionId.toString()))
+ {
+ for (SSTableReader sstable : snapshottedSSTables)
+ {
+ snapshotGenerations.add(sstable.descriptor.generation);
+ }
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
- for (SSTableReader sstable : cfs.select(ColumnFamilyStore.CANONICAL_SSTABLES).sstables)
++ for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
+ if (snapshotGenerations.contains(sstable.descriptor.generation))
+ activeSSTables.add(sstable);
+ return activeSSTables;
+ }
+
+ public synchronized void maybeSnapshot(UUID cfId, UUID parentSessionId)
+ {
+ String snapshotName = parentSessionId.toString();
+ if (!columnFamilyStores.get(cfId).snapshotExists(snapshotName))
+ {
+ Set<SSTableReader> snapshottedSSTables = columnFamilyStores.get(cfId).snapshot(snapshotName, new Predicate<SSTableReader>()
+ {
+ public boolean apply(SSTableReader sstable)
+ {
+ return sstable != null &&
+ (!isIncremental || !sstable.isRepaired()) &&
- !(sstable.partitioner instanceof LocalPartitioner) && // exclude SSTables from 2i
++ !(sstable.metadata.isIndex()) && // exclude SSTables from 2i
+ new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges);
+ }
+ }, true);
+
+ if (isAlreadyRepairing(cfId, parentSessionId, snapshottedSSTables))
+ {
+ columnFamilyStores.get(cfId).clearSnapshot(parentSessionId.toString());
+ logger.error("Cannot start multiple repair sessions over the same sstables");
+ throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
+ }
+ addSSTables(cfId, snapshottedSSTables);
+ marked.add(cfId);
+ }
+ }
+
+
+ /**
+ * Compares other repairing sstables *generation* to the ones we just snapshotted
+ *
+ * we compare generations since the sstables have different paths due to snapshot names
+ *
+ * @param cfId id of the column family store
+ * @param parentSessionId parent repair session
+ * @param sstables the newly snapshotted sstables
+ * @return
+ */
+ private boolean isAlreadyRepairing(UUID cfId, UUID parentSessionId, Collection<SSTableReader> sstables)
+ {
+ Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfId, parentSessionId);
+ Set<Integer> currentlyRepairingGenerations = new HashSet<>();
+ Set<Integer> newRepairingGenerations = new HashSet<>();
+ for (SSTableReader sstable : currentlyRepairing)
+ currentlyRepairingGenerations.add(sstable.descriptor.generation);
+ for (SSTableReader sstable : sstables)
+ newRepairingGenerations.add(sstable.descriptor.generation);
+
+ return !Sets.intersection(currentlyRepairingGenerations, newRepairingGenerations).isEmpty();
+ }
+
private Set<SSTableReader> getActiveSSTables(UUID cfId)
{
Set<String> repairedSSTables = sstableMap.get(cfId);
Set<SSTableReader> activeSSTables = new HashSet<>();
Set<String> activeSSTableNames = new HashSet<>();
- for (SSTableReader sstable : columnFamilyStores.get(cfId).getSSTables())
+ ColumnFamilyStore cfs = columnFamilyStores.get(cfId);
- for (SSTableReader sstable : cfs.select(ColumnFamilyStore.CANONICAL_SSTABLES).sstables)
++ for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
{
if (repairedSSTables.contains(sstable.getFilename()))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ee6e7bc/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index da067fd,03a25c6..adcd684
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@@ -20,18 -20,23 +20,21 @@@ package org.apache.cassandra.service
import java.net.InetAddress;
import java.util.*;
+ import java.util.concurrent.ExecutionException;
-import com.google.common.base.Predicate;
import com.google.common.collect.Sets;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.compaction.OperationType;
++import org.apache.cassandra.db.lifecycle.SSTableSet;
++import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
@@@ -225,17 -233,15 +229,15 @@@ public class ActiveRepairServiceTes
public void testGetActiveRepairedSSTableRefs()
{
ColumnFamilyStore store = prepareColumnFamilyStore();
- Set<SSTableReader> original = store.getUnrepairedSSTables();
+ Set<SSTableReader> original = store.getLiveSSTables();
UUID prsId = UUID.randomUUID();
- ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, false);
+ ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, 0, false);
ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId);
-
- //add all sstables to parent repair session
- prs.addSSTables(store.metadata.cfId, original);
+ prs.markSSTablesRepairing(store.metadata.cfId, prsId);
//retrieve all sstable references from parent repair sessions
- Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId);
+ Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId);
Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
assertEquals(original, retrieved);
refs.release();
@@@ -261,20 -267,117 +263,117 @@@
refs.release();
}
+ @Test
+ public void testAddingMoreSSTables()
+ {
+ ColumnFamilyStore store = prepareColumnFamilyStore();
- Set<SSTableReader> original = store.getUnrepairedSSTables();
++ Set<SSTableReader> original = Sets.newHashSet(store.select(View.select(SSTableSet.CANONICAL, (s) -> !s.isRepaired())).sstables);
+ UUID prsId = UUID.randomUUID();
- ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, true);
++ ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, System.currentTimeMillis(), true);
+ ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId);
+ prs.markSSTablesRepairing(store.metadata.cfId, prsId);
+ try (Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId))
+ {
+ Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
+ assertEquals(original, retrieved);
+ }
+ createSSTables(store, 2);
+ boolean exception = false;
+ try
+ {
+ UUID newPrsId = UUID.randomUUID();
- ActiveRepairService.instance.registerParentRepairSession(newPrsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, true);
++ ActiveRepairService.instance.registerParentRepairSession(newPrsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null, true, System.currentTimeMillis(), true);
+ ActiveRepairService.instance.getParentRepairSession(newPrsId).markSSTablesRepairing(store.metadata.cfId, newPrsId);
+ }
+ catch (Throwable t)
+ {
+ exception = true;
+ }
+ assertTrue(exception);
+
+ try (Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId))
+ {
+ Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
+ assertEquals(original, retrieved);
+ }
+ }
+
+ @Test
+ public void testSnapshotAddSSTables() throws ExecutionException, InterruptedException
+ {
+ ColumnFamilyStore store = prepareColumnFamilyStore();
+ UUID prsId = UUID.randomUUID();
- Set<SSTableReader> original = store.getUnrepairedSSTables();
- ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.partitioner.getMinimumToken(), store.partitioner.getMinimumToken())), true, true);
++ Set<SSTableReader> original = Sets.newHashSet(store.select(View.select(SSTableSet.CANONICAL, (s) -> !s.isRepaired())).sstables);
++ ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), true);
+ ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.cfId, prsId);
+
+ UUID prsId2 = UUID.randomUUID();
- ActiveRepairService.instance.registerParentRepairSession(prsId2, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.partitioner.getMinimumToken(), store.partitioner.getMinimumToken())), true, true);
++ ActiveRepairService.instance.registerParentRepairSession(prsId2, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), true);
+ createSSTables(store, 2);
+ ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.cfId, prsId);
+ try (Refs<SSTableReader> refs = ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId))
+ {
+ assertEquals(original, Sets.newHashSet(refs.iterator()));
+ }
+ store.forceMajorCompaction();
+ // after a major compaction the original sstables will be gone and we will have no sstables to anticompact:
+ try (Refs<SSTableReader> refs = ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId))
+ {
+ assertEquals(0, refs.size());
+ }
+ }
+
+ @Test
+ public void testSnapshotMultipleRepairs()
+ {
+ ColumnFamilyStore store = prepareColumnFamilyStore();
- Set<SSTableReader> original = store.getUnrepairedSSTables();
++ Set<SSTableReader> original = Sets.newHashSet(store.select(View.select(SSTableSet.CANONICAL, (s) -> !s.isRepaired())).sstables);
+ UUID prsId = UUID.randomUUID();
- ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.partitioner.getMinimumToken(), store.partitioner.getMinimumToken())), true, true);
++ ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), true);
+ ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.cfId, prsId);
+
+ UUID prsId2 = UUID.randomUUID();
- ActiveRepairService.instance.registerParentRepairSession(prsId2, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.partitioner.getMinimumToken(), store.partitioner.getMinimumToken())), true, true);
++ ActiveRepairService.instance.registerParentRepairSession(prsId2, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), true);
+ boolean exception = false;
+ try
+ {
+ ActiveRepairService.instance.getParentRepairSession(prsId2).maybeSnapshot(store.metadata.cfId, prsId2);
+ }
+ catch (Throwable t)
+ {
+ exception = true;
+ }
+ assertTrue(exception);
+ try (Refs<SSTableReader> refs = ActiveRepairService.instance.getParentRepairSession(prsId).getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId, prsId))
+ {
+ assertEquals(original, Sets.newHashSet(refs.iterator()));
+ }
+ }
+
private ColumnFamilyStore prepareColumnFamilyStore()
{
Keyspace keyspace = Keyspace.open(KEYSPACE5);
ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF_STANDARD1);
+ store.truncateBlocking();
store.disableAutoCompaction();
- for (int i = 0; i < 10; i++)
+ createSSTables(store, 10);
+ return store;
+ }
+
+ private void createSSTables(ColumnFamilyStore cfs, int count)
+ {
+ long timestamp = System.currentTimeMillis();
+ for (int i = 0; i < count; i++)
{
- new RowUpdateBuilder(store.metadata, System.currentTimeMillis(), Integer.toString(i))
- .clustering("c")
- .add("val", "val")
- .build()
- .applyUnsafe();
- DecoratedKey key = Util.dk(Integer.toString(i));
- Mutation rm = new Mutation(KEYSPACE5, key.getKey());
+ for (int j = 0; j < 10; j++)
- rm.add("Standard1", Util.cellname(Integer.toString(j)),
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
- timestamp,
- 0);
- rm.apply();
++ {
++ new RowUpdateBuilder(cfs.metadata, timestamp, Integer.toString(j))
++ .clustering("c")
++ .add("val", "val")
++ .build()
++ .applyUnsafe();
++ }
+ cfs.forceBlockingFlush();
}
- store.forceBlockingFlush();
- return store;
}
}