You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2022/09/07 16:54:09 UTC
[cassandra] branch trunk updated: Introduce compaction priorities to prevent upgrade compaction inability to finish
This is an automated email from the ASF dual-hosted git repository.
jmckenzie pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 377e114cb1 Introduce compaction priorities to prevent upgrade compaction inability to finish
377e114cb1 is described below
commit 377e114cb1459895423c292cb0bf7f921fd30e43
Author: Josh McKenzie <jm...@apache.org>
AuthorDate: Thu Aug 25 15:27:24 2022 -0400
Introduce compaction priorities to prevent upgrade compaction inability to finish
Patch by Alex Petrov; reviewed by Josh McKenzie and Marcus Eriksson for CASSANDRA-17851
Co-authored-by: Alex Petrov <ol...@gmail.com>
Co-authored-by: Josh McKenzie <jm...@apache.org>
---
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnFamilyStore.java | 42 +++-
.../cassandra/db/compaction/CompactionManager.java | 131 +++++++-----
.../db/compaction/CompactionStrategyManager.java | 4 +-
.../cassandra/db/compaction/OperationType.java | 60 ++++--
.../cassandra/db/repair/PendingAntiCompaction.java | 7 +-
.../distributed/test/PreviewRepairTest.java | 31 +--
.../distributed/test/UpgradeSSTablesTest.java | 223 ++++++++++++++++++++-
.../LongLeveledCompactionStrategyTest.java | 4 +-
.../db/compaction/CancelCompactionsTest.java | 19 +-
.../db/repair/PendingAntiCompactionTest.java | 27 ++-
11 files changed, 421 insertions(+), 128 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 6d6c736e1e..a3548e313d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.2
+ * Introduce compaction priorities to prevent upgrade compaction inability to finish (CASSANDRA-17851)
* Prevent a user from manually removing ephemeral snapshots (CASSANDRA-17757)
* Remove dependency on Maven Ant Tasks (CASSANDRA-17750)
* Update ASM(9.1 to 9.3), Mockito(1.10.10 to 1.12.13) and ByteBuddy(3.2.4 to 4.7.0) (CASSANDRA-17835)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index de1033ae0c..e4b9d781e7 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -83,6 +83,7 @@ import org.apache.cassandra.config.DurationSpec;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+import org.apache.cassandra.db.compaction.CompactionInfo;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.CompactionStrategyManager;
import org.apache.cassandra.db.compaction.OperationType;
@@ -1800,7 +1801,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
return session != null && sessions.contains(session);
};
return runWithCompactionsDisabled(() -> compactionStrategyManager.releaseRepairData(sessions),
- predicate, false, true, true);
+ predicate, OperationType.STREAM, false, true, true);
}
else
{
@@ -2539,7 +2540,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
cfs.runWithCompactionsDisabled((Callable<Void>) () -> {
cfs.data.reset(memtableFactory.create(new AtomicReference<>(CommitLogPosition.NONE), cfs.metadata, cfs));
return null;
- }, true, false);
+ }, OperationType.P0, true, false);
}
}
@@ -2628,7 +2629,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
}
};
- runWithCompactionsDisabled(FutureTask.callable(truncateRunnable), true, true);
+ runWithCompactionsDisabled(FutureTask.callable(truncateRunnable), OperationType.P0, true, true);
viewManager.build();
@@ -2659,9 +2660,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
FBUtilities.waitOnFuture(dumpMemtable());
}
- public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation, boolean interruptViews)
+ public <V> V runWithCompactionsDisabled(Callable<V> callable, OperationType operationType, boolean interruptValidation, boolean interruptViews)
{
- return runWithCompactionsDisabled(callable, (sstable) -> true, interruptValidation, interruptViews, true);
+ return runWithCompactionsDisabled(callable, (sstable) -> true, operationType, interruptValidation, interruptViews, true);
}
/**
@@ -2674,13 +2675,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
* @param interruptIndexes if we should interrupt compactions on indexes. NOTE: if you set this to true your sstablePredicate
* must be able to handle LocalPartitioner sstables!
*/
- public <V> V runWithCompactionsDisabled(Callable<V> callable, Predicate<SSTableReader> sstablesPredicate, boolean interruptValidation, boolean interruptViews, boolean interruptIndexes)
+ public <V> V runWithCompactionsDisabled(Callable<V> callable, Predicate<SSTableReader> sstablesPredicate, OperationType operationType, boolean interruptValidation, boolean interruptViews, boolean interruptIndexes)
{
// synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly,
// and so we only run one major compaction at a time
synchronized (this)
{
- logger.trace("Cancelling in-progress compactions for {}", metadata.name);
+ logger.debug("Cancelling in-progress compactions for {}", metadata.name);
Iterable<ColumnFamilyStore> toInterruptFor = interruptIndexes
? concatWithIndexes()
: Collections.singleton(this);
@@ -2689,9 +2690,24 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
? Iterables.concat(toInterruptFor, viewManager.allViewsCfs())
: toInterruptFor;
+ Iterable<TableMetadata> toInterruptForMetadata = Iterables.transform(toInterruptFor, ColumnFamilyStore::metadata);
+
try (CompactionManager.CompactionPauser pause = CompactionManager.instance.pauseGlobalCompaction();
CompactionManager.CompactionPauser pausedStrategies = pauseCompactionStrategies(toInterruptFor))
{
+ List<CompactionInfo.Holder> uninterruptibleTasks = CompactionManager.instance.getCompactionsMatching(toInterruptForMetadata,
+ (info) -> info.getTaskType().priority <= operationType.priority);
+ if (!uninterruptibleTasks.isEmpty())
+ {
+ logger.info("Unable to cancel in-progress compactions, since they're running with higher or same priority: {}. You can abort these operations using `nodetool stop`.",
+ uninterruptibleTasks.stream().map((compaction) -> String.format("%s@%s (%s)",
+ compaction.getCompactionInfo().getTaskType(),
+ compaction.getCompactionInfo().getTable(),
+ compaction.getCompactionInfo().getTaskId()))
+ .collect(Collectors.joining(",")));
+ return null;
+ }
+
// interrupt in-progress compactions
CompactionManager.instance.interruptCompactionForCFs(toInterruptFor, sstablesPredicate, interruptValidation);
CompactionManager.instance.waitForCessation(toInterruptFor, sstablesPredicate);
@@ -2701,7 +2717,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
{
if (cfs.getTracker().getCompacting().stream().anyMatch(sstablesPredicate))
{
- logger.warn("Unable to cancel in-progress compactions for {}. Perhaps there is an unusually large row in progress somewhere, or the system is simply overloaded.", metadata.name);
+ logger.warn("Unable to cancel in-progress compactions for {}. " +
+ "Perhaps there is an unusually large row in progress somewhere, or the system is simply overloaded.",
+ metadata.name);
return null;
}
}
@@ -2756,7 +2774,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
return accumulate;
}
- public LifecycleTransaction markAllCompacting(final OperationType operationType)
+ public <T> T withAllSSTables(final OperationType operationType, Function<LifecycleTransaction, T> op)
{
Callable<LifecycleTransaction> callable = () -> {
assert data.getCompacting().isEmpty() : data.getCompacting();
@@ -2767,10 +2785,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner
return modifier;
};
- return runWithCompactionsDisabled(callable, false, false);
+ try (LifecycleTransaction compacting = runWithCompactionsDisabled(callable, operationType, false, false))
+ {
+ return op.apply(compacting);
+ }
}
-
@Override
public String toString()
{
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 5906ac294a..dc22b6712a 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -369,68 +369,70 @@ public class CompactionManager implements CompactionManagerMBean
* @throws InterruptedException
*/
@SuppressWarnings("resource")
- private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation, int jobs, OperationType operationType) throws ExecutionException, InterruptedException
+ private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation, int jobs, OperationType operationType)
{
- logger.info("Starting {} for {}.{}", operationType, cfs.keyspace.getName(), cfs.getTableName());
- List<LifecycleTransaction> transactions = new ArrayList<>();
- List<Future<?>> futures = new ArrayList<>();
- try (LifecycleTransaction compacting = cfs.markAllCompacting(operationType))
- {
- if (compacting == null)
- return AllSSTableOpStatus.UNABLE_TO_CANCEL;
-
- Iterable<SSTableReader> sstables = Lists.newArrayList(operation.filterSSTables(compacting));
- if (Iterables.isEmpty(sstables))
+ return cfs.withAllSSTables(operationType, (compacting) -> {
+ logger.info("Starting {} for {}.{}", operationType, cfs.keyspace.getName(), cfs.getTableName());
+ List<LifecycleTransaction> transactions = new ArrayList<>();
+ List<Future<?>> futures = new ArrayList<>();
+ try
{
- logger.info("No sstables to {} for {}.{}", operationType.name(), cfs.keyspace.getName(), cfs.name);
- return AllSSTableOpStatus.SUCCESSFUL;
- }
+ if (compacting == null)
+ return AllSSTableOpStatus.UNABLE_TO_CANCEL;
- for (final SSTableReader sstable : sstables)
- {
- final LifecycleTransaction txn = compacting.split(singleton(sstable));
- transactions.add(txn);
- Callable<Object> callable = new Callable<Object>()
+ Iterable<SSTableReader> sstables = Lists.newArrayList(operation.filterSSTables(compacting));
+ if (Iterables.isEmpty(sstables))
{
- @Override
- public Object call() throws Exception
- {
- operation.execute(txn);
- return this;
- }
- };
- Future<?> fut = executor.submitIfRunning(callable, "paralell sstable operation");
- if (!fut.isCancelled())
- futures.add(fut);
- else
- return AllSSTableOpStatus.ABORTED;
+ logger.info("No sstables to {} for {}.{}", operationType.name(), cfs.keyspace.getName(), cfs.name);
+ return AllSSTableOpStatus.SUCCESSFUL;
+ }
- if (jobs > 0 && futures.size() == jobs)
+ for (final SSTableReader sstable : sstables)
{
- Future<?> f = FBUtilities.waitOnFirstFuture(futures);
- futures.remove(f);
+ final LifecycleTransaction txn = compacting.split(singleton(sstable));
+ transactions.add(txn);
+ Callable<Object> callable = new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ operation.execute(txn);
+ return this;
+ }
+ };
+ Future<?> fut = executor.submitIfRunning(callable, "paralell sstable operation");
+ if (!fut.isCancelled())
+ futures.add(fut);
+ else
+ return AllSSTableOpStatus.ABORTED;
+
+ if (jobs > 0 && futures.size() == jobs)
+ {
+ Future<?> f = FBUtilities.waitOnFirstFuture(futures);
+ futures.remove(f);
+ }
}
- }
- FBUtilities.waitOnFutures(futures);
- assert compacting.originals().isEmpty();
- logger.info("Finished {} for {}.{} successfully", operationType, cfs.keyspace.getName(), cfs.getTableName());
- return AllSSTableOpStatus.SUCCESSFUL;
- }
- finally
- {
- // wait on any unfinished futures to make sure we don't close an ongoing transaction
- try
- {
FBUtilities.waitOnFutures(futures);
+ assert compacting.originals().isEmpty();
+ logger.info("Finished {} for {}.{} successfully", operationType, cfs.keyspace.getName(), cfs.getTableName());
+ return AllSSTableOpStatus.SUCCESSFUL;
}
- catch (Throwable t)
+ finally
{
- // these are handled/logged in CompactionExecutor#afterExecute
+ // wait on any unfinished futures to make sure we don't close an ongoing transaction
+ try
+ {
+ FBUtilities.waitOnFutures(futures);
+ }
+ catch (Throwable t)
+ {
+ // these are handled/logged in CompactionExecutor#afterExecute
+ }
+ Throwable fail = Throwables.close(null, transactions);
+ if (fail != null)
+ logger.error("Failed to cleanup lifecycle transactions ({} for {}.{})", operationType, cfs.keyspace.getName(), cfs.getTableName(), fail);
}
- Throwable fail = Throwables.close(null, transactions);
- if (fail != null)
- logger.error("Failed to cleanup lifecycle transactions ({} for {}.{})", operationType, cfs.keyspace.getName(), cfs.getTableName(), fail);
- }
+ });
}
private static interface OneSSTableOperation
@@ -914,11 +916,17 @@ public class CompactionManager implements CompactionManagerMBean
@SuppressWarnings("resource") // the tasks are executed in parallel on the executor, making sure that they get closed
public List<Future<?>> submitMaximal(final ColumnFamilyStore cfStore, final int gcBefore, boolean splitOutput)
+ {
+ return submitMaximal(cfStore, gcBefore, splitOutput, OperationType.MAJOR_COMPACTION);
+ }
+
+ @SuppressWarnings("resource")
+ public List<Future<?>> submitMaximal(final ColumnFamilyStore cfStore, final int gcBefore, boolean splitOutput, OperationType operationType)
{
// here we compute the task off the compaction executor, so having that present doesn't
// confuse runWithCompactionsDisabled -- i.e., we don't want to deadlock ourselves, waiting
// for ourselves to finish/acknowledge cancellation before continuing.
- CompactionTasks tasks = cfStore.getCompactionStrategyManager().getMaximalTasks(gcBefore, splitOutput);
+ CompactionTasks tasks = cfStore.getCompactionStrategyManager().getMaximalTasks(gcBefore, splitOutput, operationType);
if (tasks.isEmpty())
return Collections.emptyList();
@@ -963,6 +971,7 @@ public class CompactionManager implements CompactionManagerMBean
try (CompactionTasks tasks = cfStore.runWithCompactionsDisabled(taskCreator,
sstablesPredicate,
+ OperationType.MAJOR_COMPACTION,
false,
false,
false))
@@ -2265,6 +2274,24 @@ public class CompactionManager implements CompactionManagerMBean
}
}
+ public List<Holder> getCompactionsMatching(Iterable<TableMetadata> columnFamilies, Predicate<CompactionInfo> predicate)
+ {
+ Preconditions.checkArgument(columnFamilies != null, "Attempted to getCompactionsMatching in CompactionManager with no columnFamilies specified.");
+
+ List<Holder> matched = new ArrayList<>();
+ // consider all in-progress compactions
+ for (Holder holder : active.getCompactions())
+ {
+ CompactionInfo info = holder.getCompactionInfo();
+ if (info.getTableMetadata() == null || Iterables.contains(columnFamilies, info.getTableMetadata()))
+ {
+ if (predicate.test(info))
+ matched.add(holder);
+ }
+ }
+ return matched;
+ }
+
/**
* Try to stop all of the compactions for given ColumnFamilies.
*
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index ca67ddb0ea..808ea9ecd6 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -991,7 +991,7 @@ public class CompactionStrategyManager implements INotificationConsumer
}
}
- public CompactionTasks getMaximalTasks(final int gcBefore, final boolean splitOutput)
+ public CompactionTasks getMaximalTasks(final int gcBefore, final boolean splitOutput, OperationType operationType)
{
maybeReloadDiskBoundaries();
// runWithCompactionsDisabled cancels active compactions and disables them, then we are able
@@ -1012,7 +1012,7 @@ public class CompactionStrategyManager implements INotificationConsumer
readLock.unlock();
}
return CompactionTasks.create(tasks);
- }, false, false);
+ }, operationType, false, false);
}
/**
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index e957e42c9d..a15693fe83 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -20,35 +20,51 @@ package org.apache.cassandra.db.compaction;
public enum OperationType
{
/** Each modification here should be also applied to {@link org.apache.cassandra.tools.nodetool.Stop#compactionType} */
- COMPACTION("Compaction"),
- VALIDATION("Validation"),
- KEY_CACHE_SAVE("Key cache save"),
- ROW_CACHE_SAVE("Row cache save"),
- COUNTER_CACHE_SAVE("Counter cache save"),
- CLEANUP("Cleanup"),
- SCRUB("Scrub"),
- UPGRADE_SSTABLES("Upgrade sstables"),
- INDEX_BUILD("Secondary index build"),
- /** Compaction for tombstone removal */
- TOMBSTONE_COMPACTION("Tombstone Compaction"),
- UNKNOWN("Unknown compaction type"),
- ANTICOMPACTION("Anticompaction after repair"),
- VERIFY("Verify"),
- FLUSH("Flush"),
- STREAM("Stream"),
- WRITE("Write"),
- VIEW_BUILD("View build"),
- INDEX_SUMMARY("Index summary redistribution"),
- RELOCATE("Relocate sstables to correct disk"),
- GARBAGE_COLLECT("Remove deleted data");
+ P0("Cancel all operations", 0),
+
+ // Automation or operator-driven tasks
+ CLEANUP("Cleanup", 1),
+ SCRUB("Scrub", 1),
+ UPGRADE_SSTABLES("Upgrade sstables", 1),
+ VERIFY("Verify", 1),
+ MAJOR_COMPACTION("Major compaction", 1),
+ RELOCATE("Relocate sstables to correct disk", 1),
+ GARBAGE_COLLECT("Remove deleted data", 1),
+
+ // Internal SSTable writing
+ FLUSH("Flush", 1),
+ WRITE("Write", 1),
+
+ ANTICOMPACTION("Anticompaction after repair", 2),
+ VALIDATION("Validation", 3),
+
+ INDEX_BUILD("Secondary index build", 4),
+ VIEW_BUILD("View build", 4),
+
+ COMPACTION("Compaction", 5),
+ TOMBSTONE_COMPACTION("Tombstone Compaction", 5), // Compaction for tombstone removal
+ UNKNOWN("Unknown compaction type", 5),
+
+ STREAM("Stream", 6),
+ KEY_CACHE_SAVE("Key cache save", 6),
+ ROW_CACHE_SAVE("Row cache save", 6),
+ COUNTER_CACHE_SAVE("Counter cache save", 6),
+ INDEX_SUMMARY("Index summary redistribution", 6);
public final String type;
public final String fileName;
- OperationType(String type)
+ // As of now, priority takes part only for interrupting tasks to give way to operator-driven tasks.
+ // Operation types that have a smaller number will be allowed to cancel ones that have larger numbers.
+ //
+ // Submitted tasks may be prioritised differently when forming a queue, if/when CASSANDRA-11218 is implemented.
+ public final int priority;
+
+ OperationType(String type, int priority)
{
this.type = type;
this.fileName = type.toLowerCase().replace(" ", "");
+ this.priority = priority;
}
public static OperationType fromFileName(String fileName)
diff --git a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
index af9888a3f1..a993bac5ee 100644
--- a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
+++ b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
@@ -214,6 +214,11 @@ public class PendingAntiCompaction
return null;
}
+ protected AcquireResult acquireSSTables()
+ {
+ return cfs.runWithCompactionsDisabled(this::acquireTuple, predicate, OperationType.ANTICOMPACTION, false, false, false);
+ }
+
public AcquireResult call()
{
logger.debug("acquiring sstables for pending anti compaction on session {}", sessionID);
@@ -231,7 +236,7 @@ public class PendingAntiCompaction
{
// Note that anticompactions are not disabled when running this. This is safe since runWithCompactionsDisabled
// is synchronized - acquireTuple and predicate can only be run by a single thread (for the given cfs).
- return cfs.runWithCompactionsDisabled(this::acquireTuple, predicate, false, false, false);
+ return acquireSSTables();
}
catch (SSTableAcquisitionException e)
{
diff --git a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
index 90e29f2eea..a0b643f0d3 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
@@ -73,9 +73,7 @@ import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.apache.cassandra.distributed.api.IMessageFilters.Matcher;
import static org.apache.cassandra.distributed.impl.Instance.deserializeMessage;
-import static org.apache.cassandra.distributed.test.PreviewRepairTest.DelayFirstRepairTypeMessageFilter.finalizePropose;
import static org.apache.cassandra.distributed.test.PreviewRepairTest.DelayFirstRepairTypeMessageFilter.validationRequest;
-import static org.apache.cassandra.net.Verb.FINALIZE_PROPOSE_MSG;
import static org.apache.cassandra.net.Verb.VALIDATION_REQ;
import static org.apache.cassandra.service.StorageService.instance;
import static org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition;
@@ -187,10 +185,14 @@ public class PreviewRepairTest extends TestBaseImpl
previewRepairStarted.await();
// this needs to finish before the preview repair is unpaused on node2
cluster.get(1).callOnInstance(repair(options(false, false)));
+ RepairResult irResult = cluster.get(1).callOnInstance(repair(options(false, false)));
continuePreviewRepair.signalAll();
RepairResult rs = rsFuture.get();
- assertFalse(rs.success); // preview repair should have failed
+ assertFalse(rs.success); // preview repair was started before IR, but has lower priority, so its task will get cancelled
assertFalse(rs.wasInconsistent); // and no mismatches should have been reported
+
+ assertTrue(irResult.success); // IR was started after preview repair, but has a higher priority, so it'll be allowed to finish
+ assertFalse(irResult.wasInconsistent);
}
finally
{
@@ -226,34 +228,21 @@ public class PreviewRepairTest extends TestBaseImpl
.messagesMatching(validationRequest(previewRepairStarted, continuePreviewRepair))
.drop();
- Condition irRepairStarted = newOneTimeCondition();
- Condition continueIrRepair = newOneTimeCondition();
- // this blocks the IR from committing, so we can reenable the preview
- cluster.filters()
- .outbound()
- .verbs(FINALIZE_PROPOSE_MSG.id)
- .from(1).to(2)
- .messagesMatching(finalizePropose(irRepairStarted, continueIrRepair))
- .drop();
-
Future<RepairResult> previewResult = cluster.get(1).asyncCallsOnInstance(repair(options(true, false))).call();
previewRepairStarted.await();
- // trigger IR and wait till its ready to commit
+ // trigger IR and wait till it's ready to commit
Future<RepairResult> irResult = cluster.get(1).asyncCallsOnInstance(repair(options(false, false))).call();
- irRepairStarted.await();
+ RepairResult ir = irResult.get();
+ assertTrue(ir.success); // IR was submitted after preview repair has acquired sstables, but has higher priority
+ assertFalse(ir.wasInconsistent); // not preview, so we don't care about preview notification
// unblock preview repair and wait for it to complete
continuePreviewRepair.signalAll();
RepairResult rs = previewResult.get();
- assertFalse(rs.success); // preview repair should have failed
+ assertFalse(rs.success); // preview repair was started earlier than IR session; but has smaller priority
assertFalse(rs.wasInconsistent); // and no mismatches should have been reported
-
- continueIrRepair.signalAll();
- RepairResult ir = irResult.get();
- assertTrue(ir.success);
- assertFalse(ir.wasInconsistent); // not preview, so we don't care about preview notification
}
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/UpgradeSSTablesTest.java b/test/distributed/org/apache/cassandra/distributed/test/UpgradeSSTablesTest.java
index c599f17e87..445e349885 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/UpgradeSSTablesTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/UpgradeSSTablesTest.java
@@ -19,22 +19,209 @@
package org.apache.cassandra.distributed.test;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
-import org.apache.cassandra.config.DatabaseDescriptor;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.ActiveCompactions;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.LogAction;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.CountDownLatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownLatch;
public class UpgradeSSTablesTest extends TestBaseImpl
{
+ @Test
+ public void upgradeSSTablesInterruptsOngoingCompaction() throws Throwable
+ {
+ try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck));");
+ cluster.get(1).acceptsOnInstance((String ks) -> {
+ ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl");
+ cfs.disableAutoCompaction();
+ CompactionManager.instance.setMaximumCompactorThreads(1);
+ CompactionManager.instance.setCoreCompactorThreads(1);
+ }).accept(KEYSPACE);
+
+ String blob = "blob";
+ for (int i = 0; i < 6; i++)
+ blob += blob;
+
+ for (int cnt = 0; cnt < 5; cnt++)
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?,?,?)",
+ ConsistencyLevel.QUORUM, (cnt * 1000) + i, i, blob);
+ }
+ cluster.get(1).nodetool("flush", KEYSPACE, "tbl");
+ }
+
+ LogAction logAction = cluster.get(1).logs();
+ logAction.mark();
+ Future<?> future = cluster.get(1).asyncAcceptsOnInstance((String ks) -> {
+ ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl");
+ CompactionManager.instance.submitMaximal(cfs, FBUtilities.nowInSeconds(), false, OperationType.COMPACTION);
+ }).apply(KEYSPACE);
+ Assert.assertEquals(0, cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl"));
+ future.get();
+ Assert.assertFalse(logAction.grep("Compaction interrupted").getResult().isEmpty());
+ }
+ }
+
+ @Test
+ public void compactionDoesNotCancelUpgradeSSTables() throws Throwable
+ {
+ try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck));");
+ cluster.get(1).acceptsOnInstance((String ks) -> {
+ ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl");
+ cfs.disableAutoCompaction();
+ CompactionManager.instance.setMaximumCompactorThreads(1);
+ CompactionManager.instance.setCoreCompactorThreads(1);
+ }).accept(KEYSPACE);
+
+ String blob = "blob";
+ for (int i = 0; i < 6; i++)
+ blob += blob;
+
+ for (int cnt = 0; cnt < 5; cnt++)
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?,?,?)",
+ ConsistencyLevel.QUORUM, (cnt * 1000) + i, i, blob);
+ }
+ cluster.get(1).nodetool("flush", KEYSPACE, "tbl");
+ }
+
+ LogAction logAction = cluster.get(1).logs();
+ logAction.mark();
+ Assert.assertEquals(0, cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl"));
+ Assert.assertFalse(logAction.watchFor("Compacting").getResult().isEmpty());
+
+ cluster.get(1).acceptsOnInstance((String ks) -> {
+ ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl");
+ FBUtilities.allOf(CompactionManager.instance.submitMaximal(cfs, FBUtilities.nowInSeconds(), false, OperationType.COMPACTION))
+ .awaitUninterruptibly(1, TimeUnit.MINUTES);
+
+ }).accept(KEYSPACE);
+ Assert.assertTrue(logAction.grep("Compaction interrupted").getResult().isEmpty());
+ Assert.assertFalse(logAction.grep("Finished Upgrade sstables").getResult().isEmpty());
+ Assert.assertFalse(logAction.grep("Compacted (.*) 5 sstables to").getResult().isEmpty());
+ }
+ }
+
+ @Test
+ public void cleanupDoesNotInterruptUpgradeSSTables() throws Throwable
+ {
+ try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).withInstanceInitializer(BB::install).start()))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck));");
+
+ cluster.get(1).acceptsOnInstance((String ks) -> {
+ ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl");
+ cfs.disableAutoCompaction();
+ }).accept(KEYSPACE);
+
+ String blob = "blob";
+ for (int i = 0; i < 6; i++)
+ blob += blob;
+
+ for (int i = 0; i < 10000; i++)
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?,?,?)",
+ ConsistencyLevel.QUORUM, i, i, blob);
+ }
+
+ cluster.get(1).nodetool("flush", KEYSPACE, "tbl");
+
+ LogAction logAction = cluster.get(1).logs();
+ logAction.mark();
+
+ // Start upgradingsstables - use BB to pause once inside ActiveCompactions.beginCompaction
+ Thread upgradeThread = new Thread(() -> {
+ cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl");
+ });
+ upgradeThread.start();
+ Assert.assertTrue(cluster.get(1).callOnInstance(() -> BB.starting.awaitUninterruptibly(1, TimeUnit.MINUTES)));
+
+ // Start a scrub and make sure that it fails, log check later to make sure it was
+ // because it cannot cancel the active upgrade sstables
+ Assert.assertNotEquals(0, cluster.get(1).nodetool("scrub", KEYSPACE, "tbl"));
+
+ // Now resume the upgrade sstables so test can shut down
+ cluster.get(1).runOnInstance(() -> {
+ BB.start.decrement();
+ });
+ upgradeThread.join();
+
+ Assert.assertFalse(logAction.grep("Unable to cancel in-progress compactions, since they're running with higher or same priority: Upgrade sstables").getResult().isEmpty());
+ Assert.assertFalse(logAction.grep("Starting Scrub for ").getResult().isEmpty());
+ Assert.assertFalse(logAction.grep("Finished Upgrade sstables for distributed_test_keyspace.tbl successfully").getResult().isEmpty());
+ }
+ }
+
+ @Test
+ public void truncateWhileUpgrading() throws Throwable
+ {
+ try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).start()))
+ {
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck)) "));
+ cluster.get(1).acceptsOnInstance((String ks) -> {
+ ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl");
+ cfs.disableAutoCompaction();
+ CompactionManager.instance.setMaximumCompactorThreads(1);
+ CompactionManager.instance.setCoreCompactorThreads(1);
+ }).accept(KEYSPACE);
+
+ String blob = "blob";
+ for (int i = 0; i < 10; i++)
+ blob += blob;
+
+ for (int i = 0; i < 500; i++)
+ {
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (pk, ck, v) VALUES (?,?,?)"),
+ ConsistencyLevel.QUORUM, i, i, blob);
+ if (i > 0 && i % 100 == 0)
+ cluster.get(1).nodetool("flush", KEYSPACE, "tbl");
+ }
+
+ LogAction logAction = cluster.get(1).logs();
+ logAction.mark();
+
+ Future<?> upgrade = CompletableFuture.runAsync(() -> {
+ cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl");
+ });
+
+ cluster.schemaChange(withKeyspace("TRUNCATE %s.tbl"));
+ upgrade.get();
+ Assert.assertFalse(logAction.grep("Compaction interrupted").getResult().isEmpty());
+ }
+ }
+
@Test
public void rewriteSSTablesTest() throws Throwable
{
@@ -116,4 +303,38 @@ public class UpgradeSSTablesTest extends TestBaseImpl
}
}
}
+
+ public static class BB
+ {
+ // Will be initialized in the context of the instance class loader
+ static CountDownLatch starting = newCountDownLatch(1);
+ static CountDownLatch start = newCountDownLatch(1);
+
+ public static void install(ClassLoader classLoader, Integer num)
+ {
+ new ByteBuddy().rebase(ActiveCompactions.class)
+ .method(named("beginCompaction"))
+ .intercept(MethodDelegation.to(BB.class))
+ .make()
+ .load(classLoader, ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ @SuppressWarnings("unused")
+ public static void beginCompaction(CompactionInfo.Holder ci, @SuperCall Callable<Void> zuperCall)
+ {
+ try
+ {
+ zuperCall.call();
+ if (ci.getCompactionInfo().getTaskType() == OperationType.UPGRADE_SSTABLES)
+ {
+ starting.decrement();
+ Assert.assertTrue(start.awaitUninterruptibly(1, TimeUnit.MINUTES));
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
index 733e46fd8d..a780cf1e26 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
@@ -198,9 +198,7 @@ public class LongLeveledCompactionStrategyTest
}
return null;
}
- }, true, true);
-
-
+ }, OperationType.COMPACTION, true, true);
}
@Test
diff --git a/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
index 67421ba6d2..51da0c4431 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
@@ -86,14 +86,16 @@ public class CancelCompactionsTest extends CQLTester
assertEquals(1, activeCompactions.size());
assertEquals(activeCompactions.get(0).getCompactionInfo().getSSTables(), toMarkCompacting);
// predicate requires the non-compacting sstables, should not cancel the one currently compacting:
- cfs.runWithCompactionsDisabled(() -> null, (sstable) -> !toMarkCompacting.contains(sstable), false, false, true);
+ cfs.runWithCompactionsDisabled(() -> null, (sstable) -> !toMarkCompacting.contains(sstable),
+ OperationType.P0, false, false, true);
assertEquals(1, activeCompactions.size());
assertFalse(activeCompactions.get(0).isStopRequested());
// predicate requires the compacting ones - make sure stop is requested and that when we abort that
// compaction we actually run the callable (countdown the latch)
CountDownLatch cdl = new CountDownLatch(1);
- Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { cdl.countDown(); return null; }, toMarkCompacting::contains, false, false, true));
+ Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { cdl.countDown(); return null; }, toMarkCompacting::contains,
+ OperationType.P0, false, false, true));
t.start();
while (!activeCompactions.get(0).isStopRequested())
Thread.sleep(100);
@@ -139,13 +141,16 @@ public class CancelCompactionsTest extends CQLTester
expectedSSTables.add(new HashSet<>(sstables.subList(6, 9)));
assertEquals(compactingSSTables, expectedSSTables);
- cfs.runWithCompactionsDisabled(() -> null, (sstable) -> false, false, false, true);
+ cfs.runWithCompactionsDisabled(() -> null, (sstable) -> false,
+ OperationType.P0, false, false, true);
assertEquals(2, activeCompactions.size());
assertTrue(activeCompactions.stream().noneMatch(CompactionInfo.Holder::isStopRequested));
CountDownLatch cdl = new CountDownLatch(1);
// start a compaction which only needs the sstables where first token is > 50 - these are the sstables compacted by tcts.get(1)
- Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { cdl.countDown(); return null; }, (sstable) -> first(sstable) > 50, false, false, true));
+ Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { cdl.countDown(); return null; },
+ (sstable) -> first(sstable) > 50,
+ OperationType.P0, false, false, true));
t.start();
activeCompactions = getActiveCompactionsForTable(cfs);
assertEquals(2, activeCompactions.size());
@@ -333,7 +338,8 @@ public class CancelCompactionsTest extends CQLTester
}
}
assertTrue(foundCompaction);
- cfs.runWithCompactionsDisabled(() -> {compactionsStopped.countDown(); return null;}, (sstable) -> true, false, false, true);
+ cfs.runWithCompactionsDisabled(() -> { compactionsStopped.countDown(); return null; },
+ (sstable) -> true, OperationType.P0, false, false, true);
// wait for the runWithCompactionsDisabled callable
compactionsStopped.await();
assertEquals(1, getActiveCompactionsForTable(cfs).size());
@@ -430,7 +436,8 @@ public class CancelCompactionsTest extends CQLTester
Set<SSTableReader> sstables = new HashSet<>();
try (LifecycleTransaction txn = idx.getTracker().tryModify(idx.getLiveSSTables(), OperationType.COMPACTION))
{
- getCurrentColumnFamilyStore().runWithCompactionsDisabled(() -> true, (sstable) -> { sstables.add(sstable); return true;}, false, false, false);
+ getCurrentColumnFamilyStore().runWithCompactionsDisabled(() -> true, (sstable) -> { sstables.add(sstable); return true;},
+ OperationType.P0, false, false, false);
}
// the predicate only gets compacting sstables, and we are only compacting the 2i sstables - with interruptIndexes = false we should see no sstables here
assertTrue(sstables.isEmpty());
diff --git a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
index a559478b87..c95b2dbb0e 100644
--- a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
@@ -41,17 +41,14 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.concurrent.ExecutorPlus;
-import org.apache.cassandra.concurrent.FutureTask;
-import org.apache.cassandra.concurrent.ImmediateExecutor;
-import org.apache.cassandra.utils.TimeUUID;
-import org.apache.cassandra.utils.concurrent.Future;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.concurrent.FutureTask;
+import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -78,7 +75,10 @@ import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.Util;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.utils.concurrent.Transactional;
@@ -664,15 +664,24 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
{
@Override
public boolean apply(SSTableReader sstable)
+ {
+ return true;
+ }
+ };
+
+ CompactionManager.instance.active.beginCompaction(holder);
+ PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, nextTimeUUID(), 10, 1, acp)
+ {
+ protected PendingAntiCompaction.AcquireResult acquireSSTables()
{
cdl.countDown();
if (cdl.getCount() > 0)
throw new PendingAntiCompaction.SSTableAcquisitionException("blah");
- return true;
+ else
+ CompactionManager.instance.active.finishCompaction(holder);
+ return super.acquireSSTables();
}
};
- CompactionManager.instance.active.beginCompaction(holder);
- PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, nextTimeUUID(), 10, 1, acp);
Future f = es.submit(acquisitionCallable);
cdl.await();
assertNotNull(f.get());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org