You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2019/01/31 08:14:57 UTC
[cassandra] branch trunk updated: Avoid leaking threads when remote
nodes fail anticompaction and rate limit anticompactions
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7f634fe Avoid leaking threads when remote nodes fail anticompaction and rate limit anticompactions
7f634fe is described below
commit 7f634feb7cf1fdb135133946ffd75efa681b8cb7
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Wed Jan 30 14:58:45 2019 +0100
Avoid leaking threads when remote nodes fail anticompaction and rate limit anticompactions
Patch by marcuse; reviewed by Blake Eggleston for CASSANDRA-15002
---
CHANGES.txt | 1 +
.../db/compaction/CompactionIterator.java | 14 +--
.../cassandra/db/compaction/CompactionManager.java | 12 ++-
.../cassandra/db/repair/PendingAntiCompaction.java | 76 +++++++++++---
.../cassandra/repair/consistent/LocalSessions.java | 35 +++++--
.../db/compaction/CompactionIteratorTest.java | 20 ----
.../db/repair/PendingAntiCompactionTest.java | 114 +++++++++++++++++----
7 files changed, 199 insertions(+), 73 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 852cccf..a905ad3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Avoid leaking threads when failing anticompactions and rate limit anticompactions (CASSANDRA-15002)
* Validate token() arguments early instead of throwing NPE at execution (CASSANDRA-14989)
* Add a new tool to dump audit logs (CASSANDRA-14885)
* Fix generating javadoc with Java11 (CASSANDRA-14988)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index 0aba594..1c56a87 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -79,16 +79,11 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId)
{
- this(type, scanners, controller, nowInSec, compactionId, ActiveCompactionsTracker.NOOP, true);
- }
-
- public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId, ActiveCompactionsTracker activeCompactions)
- {
- this(type, scanners, controller, nowInSec, compactionId, activeCompactions, true);
+ this(type, scanners, controller, nowInSec, compactionId, ActiveCompactionsTracker.NOOP);
}
@SuppressWarnings("resource") // We make sure to close mergedIterator in close() and CompactionIterator is itself an AutoCloseable
- public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId, ActiveCompactionsTracker activeCompactions, boolean abortable)
+ public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId, ActiveCompactionsTracker activeCompactions)
{
this.controller = controller;
this.type = type;
@@ -110,10 +105,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
: UnfilteredPartitionIterators.merge(scanners, listener());
merged = Transformation.apply(merged, new GarbageSkipper(controller));
merged = Transformation.apply(merged, new Purger(controller, nowInSec));
- if (abortable)
- compacted = Transformation.apply(merged, new AbortableUnfilteredPartitionTransformation(this));
- else
- compacted = merged;
+ compacted = Transformation.apply(merged, new AbortableUnfilteredPartitionTransformation(this));
sstables = scanners.stream().map(ISSTableScanner::getBackingSSTables).flatMap(Collection::stream).collect(ImmutableSet.toImmutableSet());
}
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 6fe6f01..01e3aa6 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1491,6 +1491,7 @@ public class CompactionManager implements CompactionManagerMBean
File destination = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
int nowInSec = FBUtilities.nowInSeconds();
+ RateLimiter limiter = getRateLimiter();
CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
try (SSTableRewriter fullWriter = SSTableRewriter.constructWithoutEarlyOpening(txn, false, groupMaxDataAge);
@@ -1509,6 +1510,12 @@ public class CompactionManager implements CompactionManagerMBean
Predicate<Token> fullChecker = !ranges.onlyFull().isEmpty() ? new Range.OrderedRangeContainmentChecker(ranges.onlyFull().ranges()) : t -> false;
Predicate<Token> transChecker = !ranges.onlyTransient().isEmpty() ? new Range.OrderedRangeContainmentChecker(ranges.onlyTransient().ranges()) : t -> false;
+ double compressionRatio = scanners.getCompressionRatio();
+ if (compressionRatio == MetadataCollector.NO_COMPRESSION_RATIO)
+ compressionRatio = 1.0;
+
+ long lastBytesScanned = 0;
+
while (ci.hasNext())
{
try (UnfilteredRowIterator partition = ci.next())
@@ -1528,6 +1535,9 @@ public class CompactionManager implements CompactionManagerMBean
// otherwise, append it to the unrepaired sstable
unrepairedWriter.append(partition);
}
+ long bytesScanned = scanners.getTotalBytesScanned();
+ compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio);
+ lastBytesScanned = bytesScanned;
}
}
@@ -1562,7 +1572,7 @@ public class CompactionManager implements CompactionManagerMBean
@VisibleForTesting
public static CompactionIterator getAntiCompactionIterator(List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID timeUUID, ActiveCompactionsTracker activeCompactions)
{
- return new CompactionIterator(OperationType.ANTICOMPACTION, scanners, controller, nowInSec, timeUUID, activeCompactions, false);
+ return new CompactionIterator(OperationType.ANTICOMPACTION, scanners, controller, nowInSec, timeUUID, activeCompactions);
}
@VisibleForTesting
diff --git a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
index 029ba59..0449cf1 100644
--- a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
+++ b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
@@ -25,6 +25,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
@@ -35,6 +36,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +65,8 @@ import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABL
public class PendingAntiCompaction
{
private static final Logger logger = LoggerFactory.getLogger(PendingAntiCompaction.class);
+ private static final int ACQUIRE_SLEEP_MS = Integer.getInteger("cassandra.acquire_sleep_ms", 1000);
+ private static final int ACQUIRE_RETRY_SECONDS = Integer.getInteger("cassandra.acquire_retry_seconds", 60);
static class AcquireResult
{
@@ -149,12 +153,22 @@ public class PendingAntiCompaction
private final ColumnFamilyStore cfs;
private final UUID sessionID;
private final AntiCompactionPredicate predicate;
+ private final int acquireRetrySeconds;
+ private final int acquireSleepMillis;
- public AcquisitionCallable(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, UUID sessionID)
+ AcquisitionCallable(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, UUID sessionID, int acquireRetrySeconds, int acquireSleepMillis)
+ {
+ this(cfs, sessionID, acquireRetrySeconds, acquireSleepMillis, new AntiCompactionPredicate(ranges, sessionID));
+ }
+
+ @VisibleForTesting
+ AcquisitionCallable(ColumnFamilyStore cfs, UUID sessionID, int acquireRetrySeconds, int acquireSleepMillis, AntiCompactionPredicate predicate)
{
this.cfs = cfs;
this.sessionID = sessionID;
- predicate = new AntiCompactionPredicate(ranges, sessionID);
+ this.predicate = predicate;
+ this.acquireRetrySeconds = acquireRetrySeconds;
+ this.acquireSleepMillis = acquireSleepMillis;
}
@SuppressWarnings("resource")
@@ -186,17 +200,34 @@ public class PendingAntiCompaction
logger.debug("acquiring sstables for pending anti compaction on session {}", sessionID);
// try to modify after cancelling running compactions. This will attempt to cancel in flight compactions including the given sstables for
// up to a minute, after which point, null will be returned
- try
- {
- // Note that anticompactions are not disabled when running this. This is safe since runWithCompactionsDisabled
- // is synchronized - acquireTuple and predicate can only be run by a single thread (for the given cfs).
- return cfs.runWithCompactionsDisabled(this::acquireTuple, predicate, false, false);
- }
- catch (SSTableAcquisitionException e)
+ long start = System.currentTimeMillis();
+ long delay = TimeUnit.SECONDS.toMillis(acquireRetrySeconds);
+ // Note that it is `predicate` throwing SSTableAcquisitionException if it finds a conflicting sstable
+ // and we only retry when runWithCompactionsDisabled throws when uses the predicate, not when acquireTuple is.
+ // This avoids the case when we have an sstable [0, 100] and a user starts a repair on [0, 50] and then [51, 100] before
+ // anticompaction has finished but not when the second repair is [25, 75] for example - then we will fail it without retry.
+ do
{
- logger.warn(e.getMessage());
- logger.debug("Got exception trying to acquire sstables", e);
- }
+ try
+ {
+ // Note that anticompactions are not disabled when running this. This is safe since runWithCompactionsDisabled
+ // is synchronized - acquireTuple and predicate can only be run by a single thread (for the given cfs).
+ return cfs.runWithCompactionsDisabled(this::acquireTuple, predicate, false, false);
+ }
+ catch (SSTableAcquisitionException e)
+ {
+ logger.warn("Session {} failed acquiring sstables: {}, retrying every {}ms for another {}s",
+ sessionID,
+ e.getMessage(),
+ acquireSleepMillis,
+ TimeUnit.SECONDS.convert(delay + start - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
+ Uninterruptibles.sleepUninterruptibly(acquireSleepMillis, TimeUnit.MILLISECONDS);
+
+ if (System.currentTimeMillis() - start > delay)
+ logger.debug("{} Timed out waiting to acquire sstables", sessionID, e);
+
+ }
+ } while (System.currentTimeMillis() - start < delay);
return null;
}
}
@@ -272,16 +303,31 @@ public class PendingAntiCompaction
private final Collection<ColumnFamilyStore> tables;
private final RangesAtEndpoint tokenRanges;
private final ExecutorService executor;
+ private final int acquireRetrySeconds;
+ private final int acquireSleepMillis;
public PendingAntiCompaction(UUID prsId,
Collection<ColumnFamilyStore> tables,
RangesAtEndpoint tokenRanges,
ExecutorService executor)
{
+ this(prsId, tables, tokenRanges, ACQUIRE_RETRY_SECONDS, ACQUIRE_SLEEP_MS, executor);
+ }
+
+ @VisibleForTesting
+ PendingAntiCompaction(UUID prsId,
+ Collection<ColumnFamilyStore> tables,
+ RangesAtEndpoint tokenRanges,
+ int acquireRetrySeconds,
+ int acquireSleepMillis,
+ ExecutorService executor)
+ {
this.prsId = prsId;
this.tables = tables;
this.tokenRanges = tokenRanges;
this.executor = executor;
+ this.acquireRetrySeconds = acquireRetrySeconds;
+ this.acquireSleepMillis = acquireSleepMillis;
}
public ListenableFuture run()
@@ -290,7 +336,7 @@ public class PendingAntiCompaction
for (ColumnFamilyStore cfs : tables)
{
cfs.forceBlockingFlush();
- ListenableFutureTask<AcquireResult> task = ListenableFutureTask.create(getAcquisitionCallable(cfs, tokenRanges.ranges(), prsId));
+ ListenableFutureTask<AcquireResult> task = ListenableFutureTask.create(getAcquisitionCallable(cfs, tokenRanges.ranges(), prsId, acquireRetrySeconds, acquireSleepMillis));
executor.submit(task);
tasks.add(task);
}
@@ -300,9 +346,9 @@ public class PendingAntiCompaction
}
@VisibleForTesting
- protected AcquisitionCallable getAcquisitionCallable(ColumnFamilyStore cfs, Set<Range<Token>> ranges, UUID prsId)
+ protected AcquisitionCallable getAcquisitionCallable(ColumnFamilyStore cfs, Set<Range<Token>> ranges, UUID prsId, int acquireRetrySeconds, int acquireSleepMillis)
{
- return new AcquisitionCallable(cfs, ranges, prsId);
+ return new AcquisitionCallable(cfs, ranges, prsId, acquireRetrySeconds, acquireSleepMillis);
}
@VisibleForTesting
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index eac1ea0..c28391a 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -618,18 +618,37 @@ public class LocalSessions
{
public void onSuccess(@Nullable Object result)
{
- logger.debug("Prepare phase for incremental repair session {} completed", sessionID);
- setStateAndSave(session, PREPARED);
- sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), true));
- executor.shutdown();
+ try
+ {
+ logger.debug("Prepare phase for incremental repair session {} completed", sessionID);
+ if (session.getState() != FAILED)
+ {
+ setStateAndSave(session, PREPARED);
+ sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), true));
+ }
+ else
+ {
+ logger.debug("Session {} failed before anticompaction completed", sessionID);
+ }
+ }
+ finally
+ {
+ executor.shutdown();
+ }
}
public void onFailure(Throwable t)
{
- logger.error("Prepare phase for incremental repair session {} failed", sessionID, t);
- sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false));
- failSession(sessionID, false);
- executor.shutdown();
+ try
+ {
+ logger.error("Prepare phase for incremental repair session {} failed", sessionID, t);
+ sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false));
+ failSession(sessionID, false);
+ }
+ finally
+ {
+ executor.shutdown();
+ }
}
});
}
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
index bb02dab..d5b066b 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
@@ -372,26 +372,6 @@ public class CompactionIteratorTest
}
}
- @Test
- public void noTransformPartitionTest()
- {
- UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false);
- List<List<Unfiltered>> inputLists = parse(new String[] {"10[100] 11[100] 12[100]"}, generator);
- List<List<Unfiltered>> tombstoneLists = parse(new String[] {}, generator);
- List<Iterable<UnfilteredRowIterator>> content = ImmutableList.copyOf(Iterables.transform(inputLists, list -> ImmutableList.of(listToIterator(list, kk))));
- Map<DecoratedKey, Iterable<UnfilteredRowIterator>> transformedSources = new TreeMap<>();
- transformedSources.put(kk, Iterables.transform(tombstoneLists, list -> listToIterator(list, kk)));
- try (CompactionController controller = new Controller(Keyspace.openAndGetStore(metadata), transformedSources, GC_BEFORE);
- CompactionIterator iter = new CompactionIterator(OperationType.COMPACTION,
- Lists.transform(content, x -> new Scanner(x)),
- controller, NOW, null, null, false))
- {
- iter.stop();
- // not abortable CompactionIterator
- assertTrue(iter.hasNext());
- }
- }
-
class Controller extends CompactionController
{
private final Map<DecoratedKey, Iterable<UnfilteredRowIterator>> tombstoneSources;
diff --git a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
index 385deac..1d4a97f 100644
--- a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@@ -80,6 +81,8 @@ import org.apache.cassandra.utils.concurrent.Transactional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -170,12 +173,12 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
}
- PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, ranges, UUIDGen.getTimeUUID());
+ PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, ranges, UUIDGen.getTimeUUID(), 0, 0);
logger.info("SSTables: {}", sstables);
logger.info("Expected: {}", expected);
PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
- Assert.assertNotNull(result);
+ assertNotNull(result);
logger.info("Originals: {}", result.txn.originals());
assertEquals(3, result.txn.originals().size());
for (SSTableReader sstable : expected)
@@ -204,9 +207,9 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor, 1, null, false);
repaired.reloadSSTableMetadata();
- PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
+ PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
- Assert.assertNotNull(result);
+ assertNotNull(result);
logger.info("Originals: {}", result.txn.originals());
assertEquals(1, result.txn.originals().size());
@@ -233,9 +236,9 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
repaired.reloadSSTableMetadata();
assertTrue(repaired.isPendingRepair());
- PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
+ PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
- Assert.assertNotNull(result);
+ assertNotNull(result);
logger.info("Originals: {}", result.txn.originals());
assertEquals(1, result.txn.originals().size());
@@ -261,9 +264,9 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
repaired.reloadSSTableMetadata();
assertTrue(repaired.isPendingRepair());
- PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
+ PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
- Assert.assertNull(result);
+ assertNull(result);
}
@Test
@@ -273,9 +276,9 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
assertEquals(0, cfs.getLiveSSTables().size());
- PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
+ PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
- Assert.assertNotNull(result);
+ assertNotNull(result);
result.abort(); // There's nothing to release, but we should exit cleanly
}
@@ -289,9 +292,9 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
cfs.disableAutoCompaction();
makeSSTables(2);
- PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
+ PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
- Assert.assertNotNull(result);
+ assertNotNull(result);
InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, NO_RANGES));
assertTrue(cb.submittedCompactions.isEmpty());
@@ -312,9 +315,9 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
cfs.disableAutoCompaction();
makeSSTables(2);
- PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
+ PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
- Assert.assertNotNull(result);
+ assertNotNull(result);
assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, result.txn.state());
InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, Collections.emptyList()));
@@ -335,9 +338,9 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
cfs.disableAutoCompaction();
makeSSTables(2);
- PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
+ PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
- Assert.assertNotNull(result);
+ assertNotNull(result);
ColumnFamilyStore cfs2 = Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata("system", "peers").id);
PendingAntiCompaction.AcquireResult fakeResult = new PendingAntiCompaction.AcquireResult(cfs2, null, null);
@@ -358,7 +361,7 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
cfs.disableAutoCompaction();
makeSSTables(2);
- PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
+ PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
UUID sessionID = UUIDGen.getTimeUUID();
ActiveRepairService.instance.registerParentRepairSession(sessionID,
@@ -434,7 +437,7 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
// now we try to start a new AC, which will try to cancel all ongoing compactions
CompactionManager.instance.active.beginCompaction(ci);
- PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), es);
+ PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), 0, 0, es);
ListenableFuture fut = pac.run();
try
{
@@ -599,6 +602,81 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
}
}
+ @Test
+ public void testRetries() throws InterruptedException, ExecutionException
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS();
+ cfs.addSSTable(MockSchema.sstable(1, true, cfs));
+ CountDownLatch cdl = new CountDownLatch(5);
+ ExecutorService es = Executors.newFixedThreadPool(1);
+ CompactionInfo.Holder holder = new CompactionInfo.Holder()
+ {
+ public CompactionInfo getCompactionInfo()
+ {
+ return new CompactionInfo(cfs.metadata(), OperationType.ANTICOMPACTION, 0, 0, UUID.randomUUID(), cfs.getLiveSSTables());
+ }
+ };
+ try
+ {
+ PendingAntiCompaction.AntiCompactionPredicate acp = new PendingAntiCompaction.AntiCompactionPredicate(FULL_RANGE, UUID.randomUUID())
+ {
+ @Override
+ public boolean apply(SSTableReader sstable)
+ {
+ cdl.countDown();
+ if (cdl.getCount() > 0)
+ throw new PendingAntiCompaction.SSTableAcquisitionException("blah");
+ return true;
+ }
+ };
+ CompactionManager.instance.active.beginCompaction(holder);
+ PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, UUID.randomUUID(), 10, 1, acp);
+ Future f = es.submit(acquisitionCallable);
+ cdl.await();
+ assertNotNull(f.get());
+ }
+ finally
+ {
+ es.shutdown();
+ CompactionManager.instance.active.finishCompaction(holder);
+ }
+ }
+
+ @Test
+ public void testRetriesTimeout() throws InterruptedException, ExecutionException
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS();
+ cfs.addSSTable(MockSchema.sstable(1, true, cfs));
+ ExecutorService es = Executors.newFixedThreadPool(1);
+ CompactionInfo.Holder holder = new CompactionInfo.Holder()
+ {
+ public CompactionInfo getCompactionInfo()
+ {
+ return new CompactionInfo(cfs.metadata(), OperationType.ANTICOMPACTION, 0, 0, UUID.randomUUID(), cfs.getLiveSSTables());
+ }
+ };
+ try
+ {
+ PendingAntiCompaction.AntiCompactionPredicate acp = new PendingAntiCompaction.AntiCompactionPredicate(FULL_RANGE, UUID.randomUUID())
+ {
+ @Override
+ public boolean apply(SSTableReader sstable)
+ {
+ throw new PendingAntiCompaction.SSTableAcquisitionException("blah");
+ }
+ };
+ CompactionManager.instance.active.beginCompaction(holder);
+ PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, UUID.randomUUID(), 2, 1000, acp);
+ Future fut = es.submit(acquisitionCallable);
+ assertNull(fut.get());
+ }
+ finally
+ {
+ es.shutdown();
+ CompactionManager.instance.active.finishCompaction(holder);
+ }
+ }
+
private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, Collection<Range<Token>> trans)
{
RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(local);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org