You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2022/09/07 16:54:09 UTC

[cassandra] branch trunk updated: Introduce compaction priorities to prevent upgrade compaction inability to finish

This is an automated email from the ASF dual-hosted git repository.

jmckenzie pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 377e114cb1 Introduce compaction priorities to prevent upgrade compaction inability to finish
377e114cb1 is described below

commit 377e114cb1459895423c292cb0bf7f921fd30e43
Author: Josh McKenzie <jm...@apache.org>
AuthorDate: Thu Aug 25 15:27:24 2022 -0400

    Introduce compaction priorities to prevent upgrade compaction inability to finish
    
    Patch by Alex Petrov; reviewed by Josh McKenzie and Marcus Eriksson for CASSANDRA-17851
    
    Co-authored-by: Alex Petrov <ol...@gmail.com>
    Co-authored-by: Josh McKenzie <jm...@apache.org>
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  42 +++-
 .../cassandra/db/compaction/CompactionManager.java | 131 +++++++-----
 .../db/compaction/CompactionStrategyManager.java   |   4 +-
 .../cassandra/db/compaction/OperationType.java     |  60 ++++--
 .../cassandra/db/repair/PendingAntiCompaction.java |   7 +-
 .../distributed/test/PreviewRepairTest.java        |  31 +--
 .../distributed/test/UpgradeSSTablesTest.java      | 223 ++++++++++++++++++++-
 .../LongLeveledCompactionStrategyTest.java         |   4 +-
 .../db/compaction/CancelCompactionsTest.java       |  19 +-
 .../db/repair/PendingAntiCompactionTest.java       |  27 ++-
 11 files changed, 421 insertions(+), 128 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 6d6c736e1e..a3548e313d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Introduce compaction priorities to prevent upgrade compaction inability to finish (CASSANDRA-17851)
  * Prevent a user from manually removing ephemeral snapshots (CASSANDRA-17757)
  * Remove dependency on Maven Ant Tasks (CASSANDRA-17750)
  * Update ASM(9.1 to 9.3), Mockito(1.10.10 to 1.12.13) and ByteBuddy(3.2.4 to 4.7.0) (CASSANDRA-17835)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index de1033ae0c..e4b9d781e7 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -83,6 +83,7 @@ import org.apache.cassandra.config.DurationSpec;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+import org.apache.cassandra.db.compaction.CompactionInfo;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.CompactionStrategyManager;
 import org.apache.cassandra.db.compaction.OperationType;
@@ -1800,7 +1801,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
                 return session != null && sessions.contains(session);
             };
             return runWithCompactionsDisabled(() -> compactionStrategyManager.releaseRepairData(sessions),
-                                              predicate, false, true, true);
+                                              predicate, OperationType.STREAM, false, true, true);
         }
         else
         {
@@ -2539,7 +2540,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
             cfs.runWithCompactionsDisabled((Callable<Void>) () -> {
                 cfs.data.reset(memtableFactory.create(new AtomicReference<>(CommitLogPosition.NONE), cfs.metadata, cfs));
                 return null;
-            }, true, false);
+            }, OperationType.P0, true, false);
         }
     }
 
@@ -2628,7 +2629,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
             }
         };
 
-        runWithCompactionsDisabled(FutureTask.callable(truncateRunnable), true, true);
+        runWithCompactionsDisabled(FutureTask.callable(truncateRunnable), OperationType.P0, true, true);
 
         viewManager.build();
 
@@ -2659,9 +2660,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
             FBUtilities.waitOnFuture(dumpMemtable());
     }
 
-    public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation, boolean interruptViews)
+    public <V> V runWithCompactionsDisabled(Callable<V> callable, OperationType operationType, boolean interruptValidation, boolean interruptViews)
     {
-        return runWithCompactionsDisabled(callable, (sstable) -> true, interruptValidation, interruptViews, true);
+        return runWithCompactionsDisabled(callable, (sstable) -> true, operationType, interruptValidation, interruptViews, true);
     }
 
     /**
@@ -2674,13 +2675,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
      * @param interruptIndexes if we should interrupt compactions on indexes. NOTE: if you set this to true your sstablePredicate
      *                         must be able to handle LocalPartitioner sstables!
      */
-    public <V> V runWithCompactionsDisabled(Callable<V> callable, Predicate<SSTableReader> sstablesPredicate, boolean interruptValidation, boolean interruptViews, boolean interruptIndexes)
+    public <V> V runWithCompactionsDisabled(Callable<V> callable, Predicate<SSTableReader> sstablesPredicate, OperationType operationType, boolean interruptValidation, boolean interruptViews, boolean interruptIndexes)
     {
         // synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly,
         // and so we only run one major compaction at a time
         synchronized (this)
         {
-            logger.trace("Cancelling in-progress compactions for {}", metadata.name);
+            logger.debug("Cancelling in-progress compactions for {}", metadata.name);
             Iterable<ColumnFamilyStore> toInterruptFor = interruptIndexes
                                                          ? concatWithIndexes()
                                                          : Collections.singleton(this);
@@ -2689,9 +2690,24 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
                              ? Iterables.concat(toInterruptFor, viewManager.allViewsCfs())
                              : toInterruptFor;
 
+            Iterable<TableMetadata> toInterruptForMetadata = Iterables.transform(toInterruptFor, ColumnFamilyStore::metadata);
+
             try (CompactionManager.CompactionPauser pause = CompactionManager.instance.pauseGlobalCompaction();
                  CompactionManager.CompactionPauser pausedStrategies = pauseCompactionStrategies(toInterruptFor))
             {
+                List<CompactionInfo.Holder> uninterruptibleTasks = CompactionManager.instance.getCompactionsMatching(toInterruptForMetadata,
+                                                                                                                     (info) -> info.getTaskType().priority <= operationType.priority);
+                if (!uninterruptibleTasks.isEmpty())
+                {
+                    logger.info("Unable to cancel in-progress compactions, since they're running with higher or same priority: {}. You can abort these operations using `nodetool stop`.",
+                                uninterruptibleTasks.stream().map((compaction) -> String.format("%s@%s (%s)",
+                                                                                                compaction.getCompactionInfo().getTaskType(),
+                                                                                                compaction.getCompactionInfo().getTable(),
+                                                                                                compaction.getCompactionInfo().getTaskId()))
+                                                    .collect(Collectors.joining(",")));
+                    return null;
+                }
+
                 // interrupt in-progress compactions
                 CompactionManager.instance.interruptCompactionForCFs(toInterruptFor, sstablesPredicate, interruptValidation);
                 CompactionManager.instance.waitForCessation(toInterruptFor, sstablesPredicate);
@@ -2701,7 +2717,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
                 {
                     if (cfs.getTracker().getCompacting().stream().anyMatch(sstablesPredicate))
                     {
-                        logger.warn("Unable to cancel in-progress compactions for {}.  Perhaps there is an unusually large row in progress somewhere, or the system is simply overloaded.", metadata.name);
+                        logger.warn("Unable to cancel in-progress compactions for {}. " +
+                                    "Perhaps there is an unusually large row in progress somewhere, or the system is simply overloaded.",
+                                    metadata.name);
                         return null;
                     }
                 }
@@ -2756,7 +2774,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
         return accumulate;
     }
 
-    public LifecycleTransaction markAllCompacting(final OperationType operationType)
+    public <T> T withAllSSTables(final OperationType operationType, Function<LifecycleTransaction, T> op)
     {
         Callable<LifecycleTransaction> callable = () -> {
             assert data.getCompacting().isEmpty() : data.getCompacting();
@@ -2767,10 +2785,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
             return modifier;
         };
 
-        return runWithCompactionsDisabled(callable, false, false);
+        try (LifecycleTransaction compacting = runWithCompactionsDisabled(callable, operationType, false, false))
+        {
+            return op.apply(compacting);
+        }
     }
 
-
     @Override
     public String toString()
     {
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 5906ac294a..dc22b6712a 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -369,68 +369,70 @@ public class CompactionManager implements CompactionManagerMBean
      * @throws InterruptedException
      */
     @SuppressWarnings("resource")
-    private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation, int jobs, OperationType operationType) throws ExecutionException, InterruptedException
+    private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation, int jobs, OperationType operationType)
     {
-        logger.info("Starting {} for {}.{}", operationType, cfs.keyspace.getName(), cfs.getTableName());
-        List<LifecycleTransaction> transactions = new ArrayList<>();
-        List<Future<?>> futures = new ArrayList<>();
-        try (LifecycleTransaction compacting = cfs.markAllCompacting(operationType))
-        {
-            if (compacting == null)
-                return AllSSTableOpStatus.UNABLE_TO_CANCEL;
-
-            Iterable<SSTableReader> sstables = Lists.newArrayList(operation.filterSSTables(compacting));
-            if (Iterables.isEmpty(sstables))
+        return cfs.withAllSSTables(operationType, (compacting) -> {
+            logger.info("Starting {} for {}.{}", operationType, cfs.keyspace.getName(), cfs.getTableName());
+            List<LifecycleTransaction> transactions = new ArrayList<>();
+            List<Future<?>> futures = new ArrayList<>();
+            try
             {
-                logger.info("No sstables to {} for {}.{}", operationType.name(), cfs.keyspace.getName(), cfs.name);
-                return AllSSTableOpStatus.SUCCESSFUL;
-            }
+                if (compacting == null)
+                    return AllSSTableOpStatus.UNABLE_TO_CANCEL;
 
-            for (final SSTableReader sstable : sstables)
-            {
-                final LifecycleTransaction txn = compacting.split(singleton(sstable));
-                transactions.add(txn);
-                Callable<Object> callable = new Callable<Object>()
+                Iterable<SSTableReader> sstables = Lists.newArrayList(operation.filterSSTables(compacting));
+                if (Iterables.isEmpty(sstables))
                 {
-                    @Override
-                    public Object call() throws Exception
-                    {
-                        operation.execute(txn);
-                        return this;
-                    }
-                };
-                Future<?> fut = executor.submitIfRunning(callable, "paralell sstable operation");
-                if (!fut.isCancelled())
-                    futures.add(fut);
-                else
-                    return AllSSTableOpStatus.ABORTED;
+                    logger.info("No sstables to {} for {}.{}", operationType.name(), cfs.keyspace.getName(), cfs.name);
+                    return AllSSTableOpStatus.SUCCESSFUL;
+                }
 
-                if (jobs > 0 && futures.size() == jobs)
+                for (final SSTableReader sstable : sstables)
                 {
-                    Future<?> f = FBUtilities.waitOnFirstFuture(futures);
-                    futures.remove(f);
+                    final LifecycleTransaction txn = compacting.split(singleton(sstable));
+                    transactions.add(txn);
+                    Callable<Object> callable = new Callable<Object>()
+                    {
+                        @Override
+                        public Object call() throws Exception
+                        {
+                            operation.execute(txn);
+                            return this;
+                        }
+                    };
+                    Future<?> fut = executor.submitIfRunning(callable, "paralell sstable operation");
+                    if (!fut.isCancelled())
+                        futures.add(fut);
+                    else
+                        return AllSSTableOpStatus.ABORTED;
+
+                    if (jobs > 0 && futures.size() == jobs)
+                    {
+                        Future<?> f = FBUtilities.waitOnFirstFuture(futures);
+                        futures.remove(f);
+                    }
                 }
-            }
-            FBUtilities.waitOnFutures(futures);
-            assert compacting.originals().isEmpty();
-            logger.info("Finished {} for {}.{} successfully", operationType, cfs.keyspace.getName(), cfs.getTableName());
-            return AllSSTableOpStatus.SUCCESSFUL;
-        }
-        finally
-        {
-            // wait on any unfinished futures to make sure we don't close an ongoing transaction
-            try
-            {
                 FBUtilities.waitOnFutures(futures);
+                assert compacting.originals().isEmpty();
+                logger.info("Finished {} for {}.{} successfully", operationType, cfs.keyspace.getName(), cfs.getTableName());
+                return AllSSTableOpStatus.SUCCESSFUL;
             }
-            catch (Throwable t)
+            finally
             {
-               // these are handled/logged in CompactionExecutor#afterExecute
+                // wait on any unfinished futures to make sure we don't close an ongoing transaction
+                try
+                {
+                    FBUtilities.waitOnFutures(futures);
+                }
+                catch (Throwable t)
+                {
+                    // these are handled/logged in CompactionExecutor#afterExecute
+                }
+                Throwable fail = Throwables.close(null, transactions);
+                if (fail != null)
+                    logger.error("Failed to cleanup lifecycle transactions ({} for {}.{})", operationType, cfs.keyspace.getName(), cfs.getTableName(), fail);
             }
-            Throwable fail = Throwables.close(null, transactions);
-            if (fail != null)
-                logger.error("Failed to cleanup lifecycle transactions ({} for {}.{})", operationType, cfs.keyspace.getName(), cfs.getTableName(), fail);
-        }
+        });
     }
 
     private static interface OneSSTableOperation
@@ -914,11 +916,17 @@ public class CompactionManager implements CompactionManagerMBean
 
     @SuppressWarnings("resource") // the tasks are executed in parallel on the executor, making sure that they get closed
     public List<Future<?>> submitMaximal(final ColumnFamilyStore cfStore, final int gcBefore, boolean splitOutput)
+    {
+            return submitMaximal(cfStore, gcBefore, splitOutput, OperationType.MAJOR_COMPACTION);
+    }
+
+    @SuppressWarnings("resource")
+    public List<Future<?>> submitMaximal(final ColumnFamilyStore cfStore, final int gcBefore, boolean splitOutput, OperationType operationType)
     {
         // here we compute the task off the compaction executor, so having that present doesn't
         // confuse runWithCompactionsDisabled -- i.e., we don't want to deadlock ourselves, waiting
         // for ourselves to finish/acknowledge cancellation before continuing.
-        CompactionTasks tasks = cfStore.getCompactionStrategyManager().getMaximalTasks(gcBefore, splitOutput);
+        CompactionTasks tasks = cfStore.getCompactionStrategyManager().getMaximalTasks(gcBefore, splitOutput, operationType);
 
         if (tasks.isEmpty())
             return Collections.emptyList();
@@ -963,6 +971,7 @@ public class CompactionManager implements CompactionManagerMBean
 
         try (CompactionTasks tasks = cfStore.runWithCompactionsDisabled(taskCreator,
                                                                         sstablesPredicate,
+                                                                        OperationType.MAJOR_COMPACTION,
                                                                         false,
                                                                         false,
                                                                         false))
@@ -2265,6 +2274,24 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
+    public List<Holder> getCompactionsMatching(Iterable<TableMetadata> columnFamilies, Predicate<CompactionInfo> predicate)
+    {
+        Preconditions.checkArgument(columnFamilies != null, "Attempted to getCompactionsMatching in CompactionManager with no columnFamilies specified.");
+
+        List<Holder> matched = new ArrayList<>();
+        // consider all in-progress compactions
+        for (Holder holder : active.getCompactions())
+        {
+            CompactionInfo info = holder.getCompactionInfo();
+            if (info.getTableMetadata() == null || Iterables.contains(columnFamilies, info.getTableMetadata()))
+            {
+                if (predicate.test(info))
+                    matched.add(holder);
+            }
+        }
+        return matched;
+    }
+
     /**
      * Try to stop all of the compactions for given ColumnFamilies.
      *
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index ca67ddb0ea..808ea9ecd6 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -991,7 +991,7 @@ public class CompactionStrategyManager implements INotificationConsumer
         }
     }
 
-    public CompactionTasks getMaximalTasks(final int gcBefore, final boolean splitOutput)
+    public CompactionTasks getMaximalTasks(final int gcBefore, final boolean splitOutput, OperationType operationType)
     {
         maybeReloadDiskBoundaries();
         // runWithCompactionsDisabled cancels active compactions and disables them, then we are able
@@ -1012,7 +1012,7 @@ public class CompactionStrategyManager implements INotificationConsumer
                 readLock.unlock();
             }
             return CompactionTasks.create(tasks);
-        }, false, false);
+        }, operationType, false, false);
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index e957e42c9d..a15693fe83 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -20,35 +20,51 @@ package org.apache.cassandra.db.compaction;
 public enum OperationType
 {
     /** Each modification here should be also applied to {@link org.apache.cassandra.tools.nodetool.Stop#compactionType} */
-    COMPACTION("Compaction"),
-    VALIDATION("Validation"),
-    KEY_CACHE_SAVE("Key cache save"),
-    ROW_CACHE_SAVE("Row cache save"),
-    COUNTER_CACHE_SAVE("Counter cache save"),
-    CLEANUP("Cleanup"),
-    SCRUB("Scrub"),
-    UPGRADE_SSTABLES("Upgrade sstables"),
-    INDEX_BUILD("Secondary index build"),
-    /** Compaction for tombstone removal */
-    TOMBSTONE_COMPACTION("Tombstone Compaction"),
-    UNKNOWN("Unknown compaction type"),
-    ANTICOMPACTION("Anticompaction after repair"),
-    VERIFY("Verify"),
-    FLUSH("Flush"),
-    STREAM("Stream"),
-    WRITE("Write"),
-    VIEW_BUILD("View build"),
-    INDEX_SUMMARY("Index summary redistribution"),
-    RELOCATE("Relocate sstables to correct disk"),
-    GARBAGE_COLLECT("Remove deleted data");
+    P0("Cancel all operations", 0),
+
+    // Automation or operator-driven tasks
+    CLEANUP("Cleanup", 1),
+    SCRUB("Scrub", 1),
+    UPGRADE_SSTABLES("Upgrade sstables", 1),
+    VERIFY("Verify", 1),
+    MAJOR_COMPACTION("Major compaction", 1),
+    RELOCATE("Relocate sstables to correct disk", 1),
+    GARBAGE_COLLECT("Remove deleted data", 1),
+
+    // Internal SSTable writing
+    FLUSH("Flush", 1),
+    WRITE("Write", 1),
+
+    ANTICOMPACTION("Anticompaction after repair", 2),
+    VALIDATION("Validation", 3),
+
+    INDEX_BUILD("Secondary index build", 4),
+    VIEW_BUILD("View build", 4),
+
+    COMPACTION("Compaction", 5),
+    TOMBSTONE_COMPACTION("Tombstone Compaction", 5), // Compaction for tombstone removal
+    UNKNOWN("Unknown compaction type", 5),
+
+    STREAM("Stream", 6),
+    KEY_CACHE_SAVE("Key cache save", 6),
+    ROW_CACHE_SAVE("Row cache save", 6),
+    COUNTER_CACHE_SAVE("Counter cache save", 6),
+    INDEX_SUMMARY("Index summary redistribution", 6);
 
     public final String type;
     public final String fileName;
 
-    OperationType(String type)
+    // As of now, priority takes part only for interrupting tasks to give way to operator-driven tasks.
+    // Operation types that have a smaller number will be allowed to cancel ones that have larger numbers.
+    //
+    // Submitted tasks may be prioritised differently when forming a queue, if/when CASSANDRA-11218 is implemented.
+    public final int priority;
+
+    OperationType(String type, int priority)
     {
         this.type = type;
         this.fileName = type.toLowerCase().replace(" ", "");
+        this.priority = priority;
     }
 
     public static OperationType fromFileName(String fileName)
diff --git a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
index af9888a3f1..a993bac5ee 100644
--- a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
+++ b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
@@ -214,6 +214,11 @@ public class PendingAntiCompaction
             return null;
         }
 
+        protected AcquireResult acquireSSTables()
+        {
+            return cfs.runWithCompactionsDisabled(this::acquireTuple, predicate, OperationType.ANTICOMPACTION, false, false, false);
+        }
+
         public AcquireResult call()
         {
             logger.debug("acquiring sstables for pending anti compaction on session {}", sessionID);
@@ -231,7 +236,7 @@ public class PendingAntiCompaction
                 {
                     // Note that anticompactions are not disabled when running this. This is safe since runWithCompactionsDisabled
                     // is synchronized - acquireTuple and predicate can only be run by a single thread (for the given cfs).
-                    return cfs.runWithCompactionsDisabled(this::acquireTuple, predicate, false, false, false);
+                    return acquireSSTables();
                 }
                 catch (SSTableAcquisitionException e)
                 {
diff --git a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
index 90e29f2eea..a0b643f0d3 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
@@ -73,9 +73,7 @@ import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher;
 import static org.apache.cassandra.distributed.impl.Instance.deserializeMessage;
-import static org.apache.cassandra.distributed.test.PreviewRepairTest.DelayFirstRepairTypeMessageFilter.finalizePropose;
 import static org.apache.cassandra.distributed.test.PreviewRepairTest.DelayFirstRepairTypeMessageFilter.validationRequest;
-import static org.apache.cassandra.net.Verb.FINALIZE_PROPOSE_MSG;
 import static org.apache.cassandra.net.Verb.VALIDATION_REQ;
 import static org.apache.cassandra.service.StorageService.instance;
 import static org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition;
@@ -187,10 +185,14 @@ public class PreviewRepairTest extends TestBaseImpl
             previewRepairStarted.await();
             // this needs to finish before the preview repair is unpaused on node2
             cluster.get(1).callOnInstance(repair(options(false, false)));
+            RepairResult irResult = cluster.get(1).callOnInstance(repair(options(false, false)));
             continuePreviewRepair.signalAll();
             RepairResult rs = rsFuture.get();
-            assertFalse(rs.success); // preview repair should have failed
+            assertFalse(rs.success); // preview repair was started before IR, but has lower priority, so its task will get cancelled
             assertFalse(rs.wasInconsistent); // and no mismatches should have been reported
+
+            assertTrue(irResult.success); // IR was started after preview repair, but has a higher priority, so it'll be allowed to finish
+            assertFalse(irResult.wasInconsistent);
         }
         finally
         {
@@ -226,34 +228,21 @@ public class PreviewRepairTest extends TestBaseImpl
                    .messagesMatching(validationRequest(previewRepairStarted, continuePreviewRepair))
                    .drop();
 
-            Condition irRepairStarted = newOneTimeCondition();
-            Condition continueIrRepair = newOneTimeCondition();
-            // this blocks the IR from committing, so we can reenable the preview
-            cluster.filters()
-                   .outbound()
-                   .verbs(FINALIZE_PROPOSE_MSG.id)
-                   .from(1).to(2)
-                   .messagesMatching(finalizePropose(irRepairStarted, continueIrRepair))
-                   .drop();
-
             Future<RepairResult> previewResult = cluster.get(1).asyncCallsOnInstance(repair(options(true, false))).call();
             previewRepairStarted.await();
 
-            // trigger IR and wait till its ready to commit
+            // trigger IR and wait till it's ready to commit
             Future<RepairResult> irResult = cluster.get(1).asyncCallsOnInstance(repair(options(false, false))).call();
-            irRepairStarted.await();
+            RepairResult ir = irResult.get();
+            assertTrue(ir.success); // IR was submitted after preview repair has acquired sstables, but has higher priority
+            assertFalse(ir.wasInconsistent); // not preview, so we don't care about preview notification
 
             // unblock preview repair and wait for it to complete
             continuePreviewRepair.signalAll();
 
             RepairResult rs = previewResult.get();
-            assertFalse(rs.success); // preview repair should have failed
+            assertFalse(rs.success); // preview repair was started earlier than IR session; but has smaller priority
             assertFalse(rs.wasInconsistent); // and no mismatches should have been reported
-
-            continueIrRepair.signalAll();
-            RepairResult ir = irResult.get();
-            assertTrue(ir.success);
-            assertFalse(ir.wasInconsistent); // not preview, so we don't care about preview notification
         }
     }
 
diff --git a/test/distributed/org/apache/cassandra/distributed/test/UpgradeSSTablesTest.java b/test/distributed/org/apache/cassandra/distributed/test/UpgradeSSTablesTest.java
index c599f17e87..445e349885 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/UpgradeSSTablesTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/UpgradeSSTablesTest.java
@@ -19,22 +19,209 @@
 package org.apache.cassandra.distributed.test;
 
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Assert;
 import org.junit.Test;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.ActiveCompactions;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.LogAction;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.CountDownLatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownLatch;
 
 public class UpgradeSSTablesTest extends TestBaseImpl
 {
+    @Test
+    public void upgradeSSTablesInterruptsOngoingCompaction() throws Throwable
+    {
+        try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).start()))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck));");
+            cluster.get(1).acceptsOnInstance((String ks) -> {
+                ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl");
+                cfs.disableAutoCompaction();
+                CompactionManager.instance.setMaximumCompactorThreads(1);
+                CompactionManager.instance.setCoreCompactorThreads(1);
+            }).accept(KEYSPACE);
+
+            String blob = "blob";
+            for (int i = 0; i < 6; i++)
+                blob += blob;
+
+            for (int cnt = 0; cnt < 5; cnt++)
+            {
+                for (int i = 0; i < 100; i++)
+                {
+                    cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?,?,?)",
+                                                   ConsistencyLevel.QUORUM, (cnt * 1000) + i, i, blob);
+                }
+                cluster.get(1).nodetool("flush", KEYSPACE, "tbl");
+            }
+
+            LogAction logAction = cluster.get(1).logs();
+            logAction.mark();
+            Future<?> future = cluster.get(1).asyncAcceptsOnInstance((String ks) -> {
+                ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl");
+                CompactionManager.instance.submitMaximal(cfs, FBUtilities.nowInSeconds(), false, OperationType.COMPACTION);
+            }).apply(KEYSPACE);
+            Assert.assertEquals(0, cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl"));
+            future.get();
+            Assert.assertFalse(logAction.grep("Compaction interrupted").getResult().isEmpty());
+        }
+    }
+
+    @Test
+    public void compactionDoesNotCancelUpgradeSSTables() throws Throwable
+    {
+        try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).start()))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck));");
+            cluster.get(1).acceptsOnInstance((String ks) -> {
+                ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl");
+                cfs.disableAutoCompaction();
+                CompactionManager.instance.setMaximumCompactorThreads(1);
+                CompactionManager.instance.setCoreCompactorThreads(1);
+            }).accept(KEYSPACE);
+
+            String blob = "blob";
+            for (int i = 0; i < 6; i++)
+                blob += blob;
+
+            for (int cnt = 0; cnt < 5; cnt++)
+            {
+                for (int i = 0; i < 100; i++)
+                {
+                    cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?,?,?)",
+                                                   ConsistencyLevel.QUORUM, (cnt * 1000) + i, i, blob);
+                }
+                cluster.get(1).nodetool("flush", KEYSPACE, "tbl");
+            }
+
+            LogAction logAction = cluster.get(1).logs();
+            logAction.mark();
+            Assert.assertEquals(0, cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl"));
+            Assert.assertFalse(logAction.watchFor("Compacting").getResult().isEmpty());
+
+            cluster.get(1).acceptsOnInstance((String ks) -> {
+                ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl");
+                FBUtilities.allOf(CompactionManager.instance.submitMaximal(cfs, FBUtilities.nowInSeconds(), false, OperationType.COMPACTION))
+                           .awaitUninterruptibly(1, TimeUnit.MINUTES);
+
+            }).accept(KEYSPACE);
+            Assert.assertTrue(logAction.grep("Compaction interrupted").getResult().isEmpty());
+            Assert.assertFalse(logAction.grep("Finished Upgrade sstables").getResult().isEmpty());
+            Assert.assertFalse(logAction.grep("Compacted (.*) 5 sstables to").getResult().isEmpty());
+        }
+    }
+
+    @Test
+    public void cleanupDoesNotInterruptUpgradeSSTables() throws Throwable
+    {
+        try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).withInstanceInitializer(BB::install).start()))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck));");
+
+            cluster.get(1).acceptsOnInstance((String ks) -> {
+                ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl");
+                cfs.disableAutoCompaction();
+            }).accept(KEYSPACE);
+
+            String blob = "blob";
+            for (int i = 0; i < 6; i++)
+                blob += blob;
+
+            for (int i = 0; i < 10000; i++)
+            {
+                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?,?,?)",
+                                               ConsistencyLevel.QUORUM, i, i, blob);
+            }
+
+            cluster.get(1).nodetool("flush", KEYSPACE, "tbl");
+
+            LogAction logAction = cluster.get(1).logs();
+            logAction.mark();
+
+            // Start upgradingsstables - use BB to pause once inside ActiveCompactions.beginCompaction
+            Thread upgradeThread = new Thread(() -> {
+                cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl");
+            });
+            upgradeThread.start();
+            Assert.assertTrue(cluster.get(1).callOnInstance(() -> BB.starting.awaitUninterruptibly(1, TimeUnit.MINUTES)));
+
+            // Start a scrub and make sure that it fails, log check later to make sure it was
+            // because it cannot cancel the active upgrade sstables
+            Assert.assertNotEquals(0, cluster.get(1).nodetool("scrub", KEYSPACE, "tbl"));
+
+            // Now resume the upgrade sstables so test can shut down
+            cluster.get(1).runOnInstance(() -> {
+                BB.start.decrement();
+            });
+            upgradeThread.join();
+
+            Assert.assertFalse(logAction.grep("Unable to cancel in-progress compactions, since they're running with higher or same priority: Upgrade sstables").getResult().isEmpty());
+            Assert.assertFalse(logAction.grep("Starting Scrub for ").getResult().isEmpty());
+            Assert.assertFalse(logAction.grep("Finished Upgrade sstables for distributed_test_keyspace.tbl successfully").getResult().isEmpty());
+        }
+    }
+
+    @Test
+    public void truncateWhileUpgrading() throws Throwable
+    {
+        try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).start()))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck)) "));
+            cluster.get(1).acceptsOnInstance((String ks) -> {
+                ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl");
+                cfs.disableAutoCompaction();
+                CompactionManager.instance.setMaximumCompactorThreads(1);
+                CompactionManager.instance.setCoreCompactorThreads(1);
+            }).accept(KEYSPACE);
+
+            String blob = "blob";
+            for (int i = 0; i < 10; i++)
+                blob += blob;
+
+            for (int i = 0; i < 500; i++)
+            {
+                cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (?,?,?)"),
+                                               ConsistencyLevel.QUORUM, i, i, blob);
+                if (i > 0 && i % 100 == 0)
+                    cluster.get(1).nodetool("flush", KEYSPACE, "tbl");
+            }
+
+            LogAction logAction = cluster.get(1).logs();
+            logAction.mark();
+
+            Future<?> upgrade = CompletableFuture.runAsync(() -> {
+                cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl");
+            });
+
+            cluster.schemaChange(withKeyspace("TRUNCATE %s.tbl"));
+            upgrade.get();
+            Assert.assertFalse(logAction.grep("Compaction interrupted").getResult().isEmpty());
+        }
+    }
+
     @Test
     public void rewriteSSTablesTest() throws Throwable
     {
@@ -116,4 +303,38 @@ public class UpgradeSSTablesTest extends TestBaseImpl
             }
         }
     }
+
+    public static class BB
+    {
+        // Will be initialized in the context of the instance class loader
+        static CountDownLatch starting = newCountDownLatch(1);
+        static CountDownLatch start = newCountDownLatch(1);
+
+        public static void install(ClassLoader classLoader, Integer num)
+        {
+            new ByteBuddy().rebase(ActiveCompactions.class)
+                           .method(named("beginCompaction"))
+                           .intercept(MethodDelegation.to(BB.class))
+                           .make()
+                           .load(classLoader, ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        @SuppressWarnings("unused")
+        public static void beginCompaction(CompactionInfo.Holder ci, @SuperCall Callable<Void> zuperCall)
+        {
+            try
+            {
+                zuperCall.call();
+                if (ci.getCompactionInfo().getTaskType() == OperationType.UPGRADE_SSTABLES)
+                {
+                    starting.decrement();
+                    Assert.assertTrue(start.awaitUninterruptibly(1, TimeUnit.MINUTES));
+                }
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
 }
diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
index 733e46fd8d..a780cf1e26 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
@@ -198,9 +198,7 @@ public class LongLeveledCompactionStrategyTest
                 }
                 return null;
             }
-        }, true, true);
-
-
+        }, OperationType.COMPACTION, true, true);
     }
 
     @Test
diff --git a/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
index 67421ba6d2..51da0c4431 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
@@ -86,14 +86,16 @@ public class CancelCompactionsTest extends CQLTester
             assertEquals(1, activeCompactions.size());
             assertEquals(activeCompactions.get(0).getCompactionInfo().getSSTables(), toMarkCompacting);
             // predicate requires the non-compacting sstables, should not cancel the one currently compacting:
-            cfs.runWithCompactionsDisabled(() -> null, (sstable) -> !toMarkCompacting.contains(sstable), false, false, true);
+            cfs.runWithCompactionsDisabled(() -> null, (sstable) -> !toMarkCompacting.contains(sstable),
+                                           OperationType.P0, false, false, true);
             assertEquals(1, activeCompactions.size());
             assertFalse(activeCompactions.get(0).isStopRequested());
 
             // predicate requires the compacting ones - make sure stop is requested and that when we abort that
             // compaction we actually run the callable (countdown the latch)
             CountDownLatch cdl = new CountDownLatch(1);
-            Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { cdl.countDown(); return null; }, toMarkCompacting::contains, false, false, true));
+            Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { cdl.countDown(); return null; }, toMarkCompacting::contains,
+                                                                       OperationType.P0, false, false, true));
             t.start();
             while (!activeCompactions.get(0).isStopRequested())
                 Thread.sleep(100);
@@ -139,13 +141,16 @@ public class CancelCompactionsTest extends CQLTester
             expectedSSTables.add(new HashSet<>(sstables.subList(6, 9)));
             assertEquals(compactingSSTables, expectedSSTables);
 
-            cfs.runWithCompactionsDisabled(() -> null, (sstable) -> false, false, false, true);
+            cfs.runWithCompactionsDisabled(() -> null, (sstable) -> false,
+                                           OperationType.P0, false, false, true);
             assertEquals(2, activeCompactions.size());
             assertTrue(activeCompactions.stream().noneMatch(CompactionInfo.Holder::isStopRequested));
 
             CountDownLatch cdl = new CountDownLatch(1);
             // start a compaction which only needs the sstables where first token is > 50 - these are the sstables compacted by tcts.get(1)
-            Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { cdl.countDown(); return null; }, (sstable) -> first(sstable) > 50, false, false, true));
+            Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { cdl.countDown(); return null; },
+                                                                       (sstable) -> first(sstable) > 50,
+                                                                       OperationType.P0, false, false, true));
             t.start();
             activeCompactions = getActiveCompactionsForTable(cfs);
             assertEquals(2, activeCompactions.size());
@@ -333,7 +338,8 @@ public class CancelCompactionsTest extends CQLTester
             }
         }
         assertTrue(foundCompaction);
-        cfs.runWithCompactionsDisabled(() -> {compactionsStopped.countDown(); return null;}, (sstable) -> true, false, false, true);
+        cfs.runWithCompactionsDisabled(() -> { compactionsStopped.countDown(); return null; },
+                                       (sstable) -> true, OperationType.P0, false, false, true);
         // wait for the runWithCompactionsDisabled callable
         compactionsStopped.await();
         assertEquals(1, getActiveCompactionsForTable(cfs).size());
@@ -430,7 +436,8 @@ public class CancelCompactionsTest extends CQLTester
         Set<SSTableReader> sstables = new HashSet<>();
         try (LifecycleTransaction txn = idx.getTracker().tryModify(idx.getLiveSSTables(), OperationType.COMPACTION))
         {
-            getCurrentColumnFamilyStore().runWithCompactionsDisabled(() -> true, (sstable) -> { sstables.add(sstable); return true;}, false, false, false);
+            getCurrentColumnFamilyStore().runWithCompactionsDisabled(() -> true, (sstable) -> { sstables.add(sstable); return true;},
+                                                                     OperationType.P0, false, false, false);
         }
         // the predicate only gets compacting sstables, and we are only compacting the 2i sstables - with interruptIndexes = false we should see no sstables here
         assertTrue(sstables.isEmpty());
diff --git a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
index a559478b87..c95b2dbb0e 100644
--- a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
@@ -41,17 +41,14 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 
-import org.apache.cassandra.Util;
-import org.apache.cassandra.concurrent.ExecutorPlus;
-import org.apache.cassandra.concurrent.FutureTask;
-import org.apache.cassandra.concurrent.ImmediateExecutor;
-import org.apache.cassandra.utils.TimeUUID;
-import org.apache.cassandra.utils.concurrent.Future;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.concurrent.FutureTask;
+import org.apache.cassandra.concurrent.ImmediateExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -78,7 +75,10 @@ import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.Util;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.TimeUUID;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.apache.cassandra.utils.concurrent.Transactional;
 
@@ -664,15 +664,24 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
             {
                 @Override
                 public boolean apply(SSTableReader sstable)
+                {
+                    return true;
+                }
+            };
+
+            CompactionManager.instance.active.beginCompaction(holder);
+            PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, nextTimeUUID(), 10, 1, acp)
+            {
+                protected PendingAntiCompaction.AcquireResult acquireSSTables()
                 {
                     cdl.countDown();
                     if (cdl.getCount() > 0)
                         throw new PendingAntiCompaction.SSTableAcquisitionException("blah");
-                    return true;
+                    else
+                        CompactionManager.instance.active.finishCompaction(holder);
+                    return super.acquireSSTables();
                 }
             };
-            CompactionManager.instance.active.beginCompaction(holder);
-            PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, nextTimeUUID(), 10, 1, acp);
             Future f = es.submit(acquisitionCallable);
             cdl.await();
             assertNotNull(f.get());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org