You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2019/01/31 08:14:57 UTC

[cassandra] branch trunk updated: Avoid leaking threads when remote nodes fail anticompaction and rate limit anticompactions

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7f634fe  Avoid leaking threads when remote nodes fail anticompaction and rate limit anticompactions
7f634fe is described below

commit 7f634feb7cf1fdb135133946ffd75efa681b8cb7
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Wed Jan 30 14:58:45 2019 +0100

    Avoid leaking threads when remote nodes fail anticompaction and rate limit anticompactions
    
    Patch by marcuse; reviewed by Blake Eggleston for CASSANDRA-15002
---
 CHANGES.txt                                        |   1 +
 .../db/compaction/CompactionIterator.java          |  14 +--
 .../cassandra/db/compaction/CompactionManager.java |  12 ++-
 .../cassandra/db/repair/PendingAntiCompaction.java |  76 +++++++++++---
 .../cassandra/repair/consistent/LocalSessions.java |  35 +++++--
 .../db/compaction/CompactionIteratorTest.java      |  20 ----
 .../db/repair/PendingAntiCompactionTest.java       | 114 +++++++++++++++++----
 7 files changed, 199 insertions(+), 73 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 852cccf..a905ad3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Avoid leaking threads when failing anticompactions and rate limit anticompactions (CASSANDRA-15002)
  * Validate token() arguments early instead of throwing NPE at execution (CASSANDRA-14989)
  * Add a new tool to dump audit logs (CASSANDRA-14885)
  * Fix generating javadoc with Java11 (CASSANDRA-14988)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index 0aba594..1c56a87 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -79,16 +79,11 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
 
     public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId)
     {
-        this(type, scanners, controller, nowInSec, compactionId, ActiveCompactionsTracker.NOOP, true);
-    }
-
-    public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId, ActiveCompactionsTracker activeCompactions)
-    {
-        this(type, scanners, controller, nowInSec, compactionId, activeCompactions, true);
+        this(type, scanners, controller, nowInSec, compactionId, ActiveCompactionsTracker.NOOP);
     }
 
     @SuppressWarnings("resource") // We make sure to close mergedIterator in close() and CompactionIterator is itself an AutoCloseable
-    public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId, ActiveCompactionsTracker activeCompactions, boolean abortable)
+    public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId, ActiveCompactionsTracker activeCompactions)
     {
         this.controller = controller;
         this.type = type;
@@ -110,10 +105,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
                                            : UnfilteredPartitionIterators.merge(scanners, listener());
         merged = Transformation.apply(merged, new GarbageSkipper(controller));
         merged = Transformation.apply(merged, new Purger(controller, nowInSec));
-        if (abortable)
-            compacted = Transformation.apply(merged, new AbortableUnfilteredPartitionTransformation(this));
-        else
-            compacted = merged;
+        compacted = Transformation.apply(merged, new AbortableUnfilteredPartitionTransformation(this));
         sstables = scanners.stream().map(ISSTableScanner::getBackingSSTables).flatMap(Collection::stream).collect(ImmutableSet.toImmutableSet());
     }
 
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 6fe6f01..01e3aa6 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1491,6 +1491,7 @@ public class CompactionManager implements CompactionManagerMBean
 
         File destination = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
         int nowInSec = FBUtilities.nowInSeconds();
+        RateLimiter limiter = getRateLimiter();
 
         CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
         try (SSTableRewriter fullWriter = SSTableRewriter.constructWithoutEarlyOpening(txn, false, groupMaxDataAge);
@@ -1509,6 +1510,12 @@ public class CompactionManager implements CompactionManagerMBean
 
             Predicate<Token> fullChecker = !ranges.onlyFull().isEmpty() ? new Range.OrderedRangeContainmentChecker(ranges.onlyFull().ranges()) : t -> false;
             Predicate<Token> transChecker = !ranges.onlyTransient().isEmpty() ? new Range.OrderedRangeContainmentChecker(ranges.onlyTransient().ranges()) : t -> false;
+            double compressionRatio = scanners.getCompressionRatio();
+            if (compressionRatio == MetadataCollector.NO_COMPRESSION_RATIO)
+                compressionRatio = 1.0;
+
+            long lastBytesScanned = 0;
+
             while (ci.hasNext())
             {
                 try (UnfilteredRowIterator partition = ci.next())
@@ -1528,6 +1535,9 @@ public class CompactionManager implements CompactionManagerMBean
                         // otherwise, append it to the unrepaired sstable
                         unrepairedWriter.append(partition);
                     }
+                    long bytesScanned = scanners.getTotalBytesScanned();
+                    compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio);
+                    lastBytesScanned = bytesScanned;
                 }
             }
 
@@ -1562,7 +1572,7 @@ public class CompactionManager implements CompactionManagerMBean
     @VisibleForTesting
     public static CompactionIterator getAntiCompactionIterator(List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID timeUUID, ActiveCompactionsTracker activeCompactions)
     {
-        return new CompactionIterator(OperationType.ANTICOMPACTION, scanners, controller, nowInSec, timeUUID, activeCompactions, false);
+        return new CompactionIterator(OperationType.ANTICOMPACTION, scanners, controller, nowInSec, timeUUID, activeCompactions);
     }
 
     @VisibleForTesting
diff --git a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
index 029ba59..0449cf1 100644
--- a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
+++ b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -35,6 +36,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListenableFutureTask;
 import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,6 +65,8 @@ import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABL
 public class PendingAntiCompaction
 {
     private static final Logger logger = LoggerFactory.getLogger(PendingAntiCompaction.class);
+    private static final int ACQUIRE_SLEEP_MS = Integer.getInteger("cassandra.acquire_sleep_ms", 1000);
+    private static final int ACQUIRE_RETRY_SECONDS = Integer.getInteger("cassandra.acquire_retry_seconds", 60);
 
     static class AcquireResult
     {
@@ -149,12 +153,22 @@ public class PendingAntiCompaction
         private final ColumnFamilyStore cfs;
         private final UUID sessionID;
         private final AntiCompactionPredicate predicate;
+        private final int acquireRetrySeconds;
+        private final int acquireSleepMillis;
 
-        public AcquisitionCallable(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, UUID sessionID)
+        AcquisitionCallable(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, UUID sessionID, int acquireRetrySeconds, int acquireSleepMillis)
+        {
+            this(cfs, sessionID, acquireRetrySeconds, acquireSleepMillis, new AntiCompactionPredicate(ranges, sessionID));
+        }
+
+        @VisibleForTesting
+        AcquisitionCallable(ColumnFamilyStore cfs, UUID sessionID, int acquireRetrySeconds, int acquireSleepMillis, AntiCompactionPredicate predicate)
         {
             this.cfs = cfs;
             this.sessionID = sessionID;
-            predicate = new AntiCompactionPredicate(ranges, sessionID);
+            this.predicate = predicate;
+            this.acquireRetrySeconds = acquireRetrySeconds;
+            this.acquireSleepMillis = acquireSleepMillis;
         }
 
         @SuppressWarnings("resource")
@@ -186,17 +200,34 @@ public class PendingAntiCompaction
             logger.debug("acquiring sstables for pending anti compaction on session {}", sessionID);
             // try to modify after cancelling running compactions. This will attempt to cancel in flight compactions including the given sstables for
             // up to a minute, after which point, null will be returned
-            try
-            {
-                // Note that anticompactions are not disabled when running this. This is safe since runWithCompactionsDisabled
-                // is synchronized - acquireTuple and predicate can only be run by a single thread (for the given cfs).
-                return cfs.runWithCompactionsDisabled(this::acquireTuple, predicate, false, false);
-            }
-            catch (SSTableAcquisitionException e)
+            long start = System.currentTimeMillis();
+            long delay = TimeUnit.SECONDS.toMillis(acquireRetrySeconds);
+            // Note that it is `predicate` throwing SSTableAcquisitionException if it finds a conflicting sstable
+            // and we only retry when runWithCompactionsDisabled throws when uses the predicate, not when acquireTuple is.
+            // This avoids the case when we have an sstable [0, 100] and a user starts a repair on [0, 50] and then [51, 100] before
+            // anticompaction has finished but not when the second repair is [25, 75] for example - then we will fail it without retry.
+            do
             {
-                logger.warn(e.getMessage());
-                logger.debug("Got exception trying to acquire sstables", e);
-            }
+                try
+                {
+                    // Note that anticompactions are not disabled when running this. This is safe since runWithCompactionsDisabled
+                    // is synchronized - acquireTuple and predicate can only be run by a single thread (for the given cfs).
+                    return cfs.runWithCompactionsDisabled(this::acquireTuple, predicate, false, false);
+                }
+                catch (SSTableAcquisitionException e)
+                {
+                    logger.warn("Session {} failed acquiring sstables: {}, retrying every {}ms for another {}s",
+                                sessionID,
+                                e.getMessage(),
+                                acquireSleepMillis,
+                                TimeUnit.SECONDS.convert(delay + start - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
+                    Uninterruptibles.sleepUninterruptibly(acquireSleepMillis, TimeUnit.MILLISECONDS);
+
+                    if (System.currentTimeMillis() - start > delay)
+                        logger.debug("{} Timed out waiting to acquire sstables", sessionID, e);
+
+                }
+            } while (System.currentTimeMillis() - start < delay);
             return null;
         }
     }
@@ -272,16 +303,31 @@ public class PendingAntiCompaction
     private final Collection<ColumnFamilyStore> tables;
     private final RangesAtEndpoint tokenRanges;
     private final ExecutorService executor;
+    private final int acquireRetrySeconds;
+    private final int acquireSleepMillis;
 
     public PendingAntiCompaction(UUID prsId,
                                  Collection<ColumnFamilyStore> tables,
                                  RangesAtEndpoint tokenRanges,
                                  ExecutorService executor)
     {
+        this(prsId, tables, tokenRanges, ACQUIRE_RETRY_SECONDS, ACQUIRE_SLEEP_MS, executor);
+    }
+
+    @VisibleForTesting
+    PendingAntiCompaction(UUID prsId,
+                                 Collection<ColumnFamilyStore> tables,
+                                 RangesAtEndpoint tokenRanges,
+                                 int acquireRetrySeconds,
+                                 int acquireSleepMillis,
+                                 ExecutorService executor)
+    {
         this.prsId = prsId;
         this.tables = tables;
         this.tokenRanges = tokenRanges;
         this.executor = executor;
+        this.acquireRetrySeconds = acquireRetrySeconds;
+        this.acquireSleepMillis = acquireSleepMillis;
     }
 
     public ListenableFuture run()
@@ -290,7 +336,7 @@ public class PendingAntiCompaction
         for (ColumnFamilyStore cfs : tables)
         {
             cfs.forceBlockingFlush();
-            ListenableFutureTask<AcquireResult> task = ListenableFutureTask.create(getAcquisitionCallable(cfs, tokenRanges.ranges(), prsId));
+            ListenableFutureTask<AcquireResult> task = ListenableFutureTask.create(getAcquisitionCallable(cfs, tokenRanges.ranges(), prsId, acquireRetrySeconds, acquireSleepMillis));
             executor.submit(task);
             tasks.add(task);
         }
@@ -300,9 +346,9 @@ public class PendingAntiCompaction
     }
 
     @VisibleForTesting
-    protected AcquisitionCallable getAcquisitionCallable(ColumnFamilyStore cfs, Set<Range<Token>> ranges, UUID prsId)
+    protected AcquisitionCallable getAcquisitionCallable(ColumnFamilyStore cfs, Set<Range<Token>> ranges, UUID prsId, int acquireRetrySeconds, int acquireSleepMillis)
     {
-        return new AcquisitionCallable(cfs, ranges, prsId);
+        return new AcquisitionCallable(cfs, ranges, prsId, acquireRetrySeconds, acquireSleepMillis);
     }
 
     @VisibleForTesting
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index eac1ea0..c28391a 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -618,18 +618,37 @@ public class LocalSessions
         {
             public void onSuccess(@Nullable Object result)
             {
-                logger.debug("Prepare phase for incremental repair session {} completed", sessionID);
-                setStateAndSave(session, PREPARED);
-                sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), true));
-                executor.shutdown();
+                try
+                {
+                    logger.debug("Prepare phase for incremental repair session {} completed", sessionID);
+                    if (session.getState() != FAILED)
+                    {
+                        setStateAndSave(session, PREPARED);
+                        sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), true));
+                    }
+                    else
+                    {
+                        logger.debug("Session {} failed before anticompaction completed", sessionID);
+                    }
+                }
+                finally
+                {
+                    executor.shutdown();
+                }
             }
 
             public void onFailure(Throwable t)
             {
-                logger.error("Prepare phase for incremental repair session {} failed", sessionID, t);
-                sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false));
-                failSession(sessionID, false);
-                executor.shutdown();
+                try
+                {
+                    logger.error("Prepare phase for incremental repair session {} failed", sessionID, t);
+                    sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false));
+                    failSession(sessionID, false);
+                }
+                finally
+                {
+                    executor.shutdown();
+                }
             }
         });
     }
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
index bb02dab..d5b066b 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
@@ -372,26 +372,6 @@ public class CompactionIteratorTest
         }
     }
 
-    @Test
-    public void noTransformPartitionTest()
-    {
-        UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false);
-        List<List<Unfiltered>> inputLists = parse(new String[] {"10[100] 11[100] 12[100]"}, generator);
-        List<List<Unfiltered>> tombstoneLists = parse(new String[] {}, generator);
-        List<Iterable<UnfilteredRowIterator>> content = ImmutableList.copyOf(Iterables.transform(inputLists, list -> ImmutableList.of(listToIterator(list, kk))));
-        Map<DecoratedKey, Iterable<UnfilteredRowIterator>> transformedSources = new TreeMap<>();
-        transformedSources.put(kk, Iterables.transform(tombstoneLists, list -> listToIterator(list, kk)));
-        try (CompactionController controller = new Controller(Keyspace.openAndGetStore(metadata), transformedSources, GC_BEFORE);
-             CompactionIterator iter = new CompactionIterator(OperationType.COMPACTION,
-                                                              Lists.transform(content, x -> new Scanner(x)),
-                                                              controller, NOW, null, null, false))
-        {
-            iter.stop();
-            // not abortable CompactionIterator
-            assertTrue(iter.hasNext());
-        }
-    }
-
     class Controller extends CompactionController
     {
         private final Map<DecoratedKey, Iterable<UnfilteredRowIterator>> tombstoneSources;
diff --git a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
index 385deac..1d4a97f 100644
--- a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
@@ -80,6 +81,8 @@ import org.apache.cassandra.utils.concurrent.Transactional;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -170,12 +173,12 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
             ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
         }
 
-        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, ranges, UUIDGen.getTimeUUID());
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, ranges, UUIDGen.getTimeUUID(), 0, 0);
 
         logger.info("SSTables: {}", sstables);
         logger.info("Expected: {}", expected);
         PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
-        Assert.assertNotNull(result);
+        assertNotNull(result);
         logger.info("Originals: {}", result.txn.originals());
         assertEquals(3, result.txn.originals().size());
         for (SSTableReader sstable : expected)
@@ -204,9 +207,9 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
         repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor, 1, null, false);
         repaired.reloadSSTableMetadata();
 
-        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
         PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
-        Assert.assertNotNull(result);
+        assertNotNull(result);
 
         logger.info("Originals: {}", result.txn.originals());
         assertEquals(1, result.txn.originals().size());
@@ -233,9 +236,9 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
         repaired.reloadSSTableMetadata();
         assertTrue(repaired.isPendingRepair());
 
-        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
         PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
-        Assert.assertNotNull(result);
+        assertNotNull(result);
 
         logger.info("Originals: {}", result.txn.originals());
         assertEquals(1, result.txn.originals().size());
@@ -261,9 +264,9 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
         repaired.reloadSSTableMetadata();
         assertTrue(repaired.isPendingRepair());
 
-        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
         PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
-        Assert.assertNull(result);
+        assertNull(result);
     }
 
     @Test
@@ -273,9 +276,9 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
 
         assertEquals(0, cfs.getLiveSSTables().size());
 
-        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
         PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
-        Assert.assertNotNull(result);
+        assertNotNull(result);
 
         result.abort();  // There's nothing to release, but we should exit cleanly
     }
@@ -289,9 +292,9 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
         cfs.disableAutoCompaction();
         makeSSTables(2);
 
-        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
         PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
-        Assert.assertNotNull(result);
+        assertNotNull(result);
 
         InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, NO_RANGES));
         assertTrue(cb.submittedCompactions.isEmpty());
@@ -312,9 +315,9 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
         cfs.disableAutoCompaction();
         makeSSTables(2);
 
-        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
         PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
-        Assert.assertNotNull(result);
+        assertNotNull(result);
         assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, result.txn.state());
 
         InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, Collections.emptyList()));
@@ -335,9 +338,9 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
         cfs.disableAutoCompaction();
         makeSSTables(2);
 
-        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
         PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
-        Assert.assertNotNull(result);
+        assertNotNull(result);
 
         ColumnFamilyStore cfs2 = Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata("system", "peers").id);
         PendingAntiCompaction.AcquireResult fakeResult = new PendingAntiCompaction.AcquireResult(cfs2, null, null);
@@ -358,7 +361,7 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
         cfs.disableAutoCompaction();
         makeSSTables(2);
 
-        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
+        PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID(), 0, 0);
         PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
         UUID sessionID = UUIDGen.getTimeUUID();
         ActiveRepairService.instance.registerParentRepairSession(sessionID,
@@ -434,7 +437,7 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
                 // now we try to start a new AC, which will try to cancel all ongoing compactions
 
                 CompactionManager.instance.active.beginCompaction(ci);
-                PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), es);
+                PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), 0, 0, es);
                 ListenableFuture fut = pac.run();
                 try
                 {
@@ -599,6 +602,81 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
         }
     }
 
+    @Test
+    public void testRetries() throws InterruptedException, ExecutionException
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS();
+        cfs.addSSTable(MockSchema.sstable(1, true, cfs));
+        CountDownLatch cdl = new CountDownLatch(5);
+        ExecutorService es = Executors.newFixedThreadPool(1);
+        CompactionInfo.Holder holder = new CompactionInfo.Holder()
+        {
+            public CompactionInfo getCompactionInfo()
+            {
+                return new CompactionInfo(cfs.metadata(), OperationType.ANTICOMPACTION, 0, 0, UUID.randomUUID(), cfs.getLiveSSTables());
+            }
+        };
+        try
+        {
+            PendingAntiCompaction.AntiCompactionPredicate acp = new PendingAntiCompaction.AntiCompactionPredicate(FULL_RANGE, UUID.randomUUID())
+            {
+                @Override
+                public boolean apply(SSTableReader sstable)
+                {
+                    cdl.countDown();
+                    if (cdl.getCount() > 0)
+                        throw new PendingAntiCompaction.SSTableAcquisitionException("blah");
+                    return true;
+                }
+            };
+            CompactionManager.instance.active.beginCompaction(holder);
+            PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, UUID.randomUUID(), 10, 1, acp);
+            Future f = es.submit(acquisitionCallable);
+            cdl.await();
+            assertNotNull(f.get());
+        }
+        finally
+        {
+            es.shutdown();
+            CompactionManager.instance.active.finishCompaction(holder);
+        }
+    }
+
+    @Test
+    public void testRetriesTimeout() throws InterruptedException, ExecutionException
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS();
+        cfs.addSSTable(MockSchema.sstable(1, true, cfs));
+        ExecutorService es = Executors.newFixedThreadPool(1);
+        CompactionInfo.Holder holder = new CompactionInfo.Holder()
+        {
+            public CompactionInfo getCompactionInfo()
+            {
+                return new CompactionInfo(cfs.metadata(), OperationType.ANTICOMPACTION, 0, 0, UUID.randomUUID(), cfs.getLiveSSTables());
+            }
+        };
+        try
+        {
+            PendingAntiCompaction.AntiCompactionPredicate acp = new PendingAntiCompaction.AntiCompactionPredicate(FULL_RANGE, UUID.randomUUID())
+            {
+                @Override
+                public boolean apply(SSTableReader sstable)
+                {
+                    throw new PendingAntiCompaction.SSTableAcquisitionException("blah");
+                }
+            };
+            CompactionManager.instance.active.beginCompaction(holder);
+            PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, UUID.randomUUID(), 2, 1000, acp);
+            Future fut = es.submit(acquisitionCallable);
+            assertNull(fut.get());
+        }
+        finally
+        {
+            es.shutdown();
+            CompactionManager.instance.active.finishCompaction(holder);
+        }
+    }
+
     private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, Collection<Range<Token>> trans)
     {
         RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(local);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org