You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2019/01/17 07:02:11 UTC
[cassandra] branch trunk updated: Only cancel conflicting
compactions when starting anticompactions and sub range compactions
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6e27e65 Only cancel conflicting compactions when starting anticompactions and sub range compactions
6e27e65 is described below
commit 6e27e65f150ed6cc6a7205b9d7c89a763f3256fa
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Tue Dec 18 17:01:02 2018 +0100
Only cancel conflicting compactions when starting anticompactions and sub range compactions
Patch by marcuse; reviewed by Blake Eggleston for CASSANDRA-14935
---
CHANGES.txt | 1 +
.../apache/cassandra/cache/AutoSavingCache.java | 12 +-
.../org/apache/cassandra/db/ColumnFamilyStore.java | 13 +-
src/java/org/apache/cassandra/db/Keyspace.java | 2 +-
.../db/compaction/AbstractCompactionTask.java | 7 +-
.../cassandra/db/compaction/ActiveCompactions.java | 66 ++++
.../db/compaction/ActiveCompactionsTracker.java | 34 ++
.../cassandra/db/compaction/CompactionInfo.java | 108 ++++--
.../db/compaction/CompactionIterator.java | 27 +-
.../cassandra/db/compaction/CompactionManager.java | 149 +++++---
.../cassandra/db/compaction/CompactionTask.java | 15 +-
.../db/compaction/LeveledCompactionStrategy.java | 4 +-
.../db/compaction/PendingRepairManager.java | 2 +-
.../cassandra/db/compaction/SSTableSplitter.java | 17 +-
.../apache/cassandra/db/compaction/Scrubber.java | 4 +-
.../db/compaction/SingleSSTableLCSTask.java | 2 +-
.../apache/cassandra/db/compaction/Verifier.java | 4 +-
.../db/repair/CassandraValidationIterator.java | 8 +-
.../cassandra/db/repair/PendingAntiCompaction.java | 150 ++++----
.../apache/cassandra/db/view/ViewBuilderTask.java | 12 +-
src/java/org/apache/cassandra/index/Index.java | 2 +-
.../cassandra/index/internal/CassandraIndex.java | 7 +-
.../index/internal/CollatedViewIndexBuilder.java | 9 +-
.../cassandra/index/sasi/SASIIndexBuilder.java | 3 +-
.../cassandra/io/sstable/ISSTableScanner.java | 4 +-
.../io/sstable/IndexSummaryRedistribution.java | 2 +-
.../io/sstable/format/big/BigTableScanner.java | 10 +-
.../cassandra/metrics/CompactionMetrics.java | 28 +-
src/java/org/apache/cassandra/schema/Schema.java | 2 +-
.../db/compaction/LongCompactionsTest.java | 2 +-
.../LongLeveledCompactionStrategyTest.java | 2 +-
test/unit/org/apache/cassandra/Util.java | 3 +-
.../db/compaction/AbstractPendingRepairTest.java | 6 +-
.../db/compaction/ActiveCompactionsTest.java | 194 ++++++++++
.../db/compaction/CancelCompactionsTest.java | 421 +++++++++++++++++++++
.../db/compaction/CompactionIteratorTest.java | 5 +-
...CompactionStrategyManagerPendingRepairTest.java | 8 +-
.../db/compaction/CompactionTaskTest.java | 2 +-
.../db/compaction/CompactionsCQLTest.java | 4 +-
.../db/compaction/CompactionsPurgeTest.java | 4 +-
.../compaction/LeveledCompactionStrategyTest.java | 6 +-
.../db/compaction/SingleSSTableLCSTaskTest.java | 4 +-
.../db/repair/PendingAntiCompactionTest.java | 189 ++++++---
.../index/internal/CustomCassandraIndex.java | 9 +-
.../io/sstable/IndexSummaryManagerTest.java | 4 +-
.../org/apache/cassandra/schema/MockSchema.java | 4 +-
46 files changed, 1222 insertions(+), 349 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 0e1da11..f44ac72 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Only cancel conflicting compactions when starting anticompactions and sub range compactions (CASSANDRA-14935)
* Use a stub IndexRegistry for non-daemon use cases (CASSANDRA-14938)
* Don't enable client transports when bootstrap is pending (CASSANDRA-14525)
* Make antiCompactGroup throw exception on error and anticompaction non cancellable
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index d30b292..d9827e1 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -316,12 +316,12 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
else
type = OperationType.UNKNOWN;
- info = new CompactionInfo(TableMetadata.minimal(SchemaConstants.SYSTEM_KEYSPACE_NAME, cacheType.toString()),
- type,
- 0,
- keysEstimate,
- Unit.KEYS,
- UUIDGen.getTimeUUID());
+ info = CompactionInfo.withoutSSTables(TableMetadata.minimal(SchemaConstants.SYSTEM_KEYSPACE_NAME, cacheType.toString()),
+ type,
+ 0,
+ keysEstimate,
+ Unit.KEYS,
+ UUIDGen.getTimeUUID());
}
public CacheService.CacheType cacheType()
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index cc6af6f..adac934 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2185,6 +2185,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation, boolean interruptViews)
{
+ return runWithCompactionsDisabled(callable, (sstable) -> true, interruptValidation, interruptViews);
+ }
+
+ public <V> V runWithCompactionsDisabled(Callable<V> callable, Predicate<SSTableReader> sstablesPredicate, boolean interruptValidation, boolean interruptViews)
+ {
// synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly,
// and so we only run one major compaction at a time
synchronized (this)
@@ -2196,17 +2201,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
: concatWithIndexes();
for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
- cfs.getCompactionStrategyManager().pause();
+ cfs.getCompactionStrategyManager().pause(); // todo: make sure anticompaction pauses!
try
{
// interrupt in-progress compactions
- CompactionManager.instance.interruptCompactionForCFs(selfWithAuxiliaryCfs, interruptValidation);
- CompactionManager.instance.waitForCessation(selfWithAuxiliaryCfs);
+ CompactionManager.instance.interruptCompactionForCFs(selfWithAuxiliaryCfs, sstablesPredicate, interruptValidation);
+ CompactionManager.instance.waitForCessation(selfWithAuxiliaryCfs, sstablesPredicate);
// doublecheck that we finished, instead of timing out
for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
{
- if (!cfs.getTracker().getCompacting().isEmpty())
+ if (cfs.getTracker().getCompacting().stream().anyMatch(sstablesPredicate))
{
logger.warn("Unable to cancel in-progress compactions for {}. Perhaps there is an unusually large row in progress somewhere, or the system is simply overloaded.", metadata.name);
return null;
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 33c0f32..bc382ee 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -384,7 +384,7 @@ public class Keyspace
return;
cfs.getCompactionStrategyManager().shutdown();
- CompactionManager.instance.interruptCompactionForCFs(cfs.concatWithIndexes(), true);
+ CompactionManager.instance.interruptCompactionForCFs(cfs.concatWithIndexes(), (sstable) -> true, true);
// wait for any outstanding reads/writes that might affect the CFS
cfs.keyspace.writeOrder.awaitNewBarrier();
cfs.readOrdering.awaitNewBarrier();
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
index c542a51..989c21c 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
@@ -25,7 +25,6 @@ import com.google.common.base.Preconditions;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
import org.apache.cassandra.io.FSDiskFullWriteError;
import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -94,11 +93,11 @@ public abstract class AbstractCompactionTask extends WrappedRunnable
/**
* executes the task and unmarks sstables compacting
*/
- public int execute(CompactionExecutorStatsCollector collector)
+ public int execute(ActiveCompactionsTracker activeCompactions)
{
try
{
- return executeInternal(collector);
+ return executeInternal(activeCompactions);
}
catch(FSDiskFullWriteError e)
{
@@ -113,7 +112,7 @@ public abstract class AbstractCompactionTask extends WrappedRunnable
}
public abstract CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables);
- protected abstract int executeInternal(CompactionExecutorStatsCollector collector);
+ protected abstract int executeInternal(ActiveCompactionsTracker activeCompactions);
public AbstractCompactionTask setUserDefined(boolean isUserDefined)
{
diff --git a/src/java/org/apache/cassandra/db/compaction/ActiveCompactions.java b/src/java/org/apache/cassandra/db/compaction/ActiveCompactions.java
new file mode 100644
index 0000000..b289ef9
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/ActiveCompactions.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
+public class ActiveCompactions implements ActiveCompactionsTracker
+{
+ // a synchronized identity set of running tasks to their compaction info
+ private final Set<CompactionInfo.Holder> compactions = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>()));
+
+ public List<CompactionInfo.Holder> getCompactions()
+ {
+ return new ArrayList<>(compactions);
+ }
+
+ public void beginCompaction(CompactionInfo.Holder ci)
+ {
+ compactions.add(ci);
+ }
+
+ public void finishCompaction(CompactionInfo.Holder ci)
+ {
+ compactions.remove(ci);
+ CompactionManager.instance.getMetrics().bytesCompacted.inc(ci.getCompactionInfo().getTotal());
+ CompactionManager.instance.getMetrics().totalCompactionsCompleted.mark();
+ }
+
+ public CompactionInfo getCompactionForSSTable(SSTableReader sstable)
+ {
+ CompactionInfo toReturn = null;
+ for (CompactionInfo.Holder holder : compactions)
+ {
+ // todo: change compactions datastructure to avoid iterating all active compactions
+ if (holder.getCompactionInfo().getSSTables().contains(sstable))
+ {
+ if (toReturn != null)
+ throw new IllegalStateException("SSTable "+sstable+" involved in several compactions");
+ toReturn = holder.getCompactionInfo();
+ }
+ }
+ return toReturn;
+ }
+}
diff --git a/src/java/org/apache/cassandra/db/compaction/ActiveCompactionsTracker.java b/src/java/org/apache/cassandra/db/compaction/ActiveCompactionsTracker.java
new file mode 100644
index 0000000..c1bbbd8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/ActiveCompactionsTracker.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+public interface ActiveCompactionsTracker
+{
+ public void beginCompaction(CompactionInfo.Holder ci);
+ public void finishCompaction(CompactionInfo.Holder ci);
+
+ public static final ActiveCompactionsTracker NOOP = new ActiveCompactionsTracker()
+ {
+ public void beginCompaction(CompactionInfo.Holder ci)
+ {}
+
+ public void finishCompaction(CompactionInfo.Holder ci)
+ {}
+ };
+}
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index 99df259..7c950c0 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@ -17,19 +17,22 @@
*/
package org.apache.cassandra.db.compaction;
-import java.io.Serializable;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.UUID;
+import java.util.function.Predicate;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.TableMetadata;
-/** Implements serializable to allow structured info to be returned via JMX. */
-public final class CompactionInfo implements Serializable
+public final class CompactionInfo
{
- private static final long serialVersionUID = 3695381572726744816L;
-
public static final String ID = "id";
public static final String KEYSPACE = "keyspace";
public static final String COLUMNFAMILY = "columnfamily";
@@ -38,6 +41,7 @@ public final class CompactionInfo implements Serializable
public static final String TASK_TYPE = "taskType";
public static final String UNIT = "unit";
public static final String COMPACTION_ID = "compactionId";
+ public static final String SSTABLES = "sstables";
private final TableMetadata metadata;
private final OperationType tasktype;
@@ -45,41 +49,19 @@ public final class CompactionInfo implements Serializable
private final long total;
private final Unit unit;
private final UUID compactionId;
+ private final ImmutableSet<SSTableReader> sstables;
- public CompactionInfo(TableMetadata metadata, OperationType tasktype, long bytesComplete, long totalBytes, UUID compactionId)
+ public CompactionInfo(TableMetadata metadata, OperationType tasktype, long bytesComplete, long totalBytes, UUID compactionId, Collection<SSTableReader> sstables)
{
- this(metadata, tasktype, bytesComplete, totalBytes, Unit.BYTES, compactionId);
+ this(metadata, tasktype, bytesComplete, totalBytes, Unit.BYTES, compactionId, sstables);
}
- public static enum Unit
+ public CompactionInfo(OperationType tasktype, long completed, long total, Unit unit, UUID compactionId, Collection<SSTableReader> sstables)
{
- BYTES("bytes"), RANGES("token range parts"), KEYS("keys");
-
- private final String name;
-
- private Unit(String name)
- {
- this.name = name;
- }
-
- @Override
- public String toString()
- {
- return this.name;
- }
-
- public static boolean isFileSize(String unit)
- {
- return BYTES.toString().equals(unit);
- }
- }
-
- public CompactionInfo(OperationType tasktype, long completed, long total, Unit unit, UUID compactionId)
- {
- this(null, tasktype, completed, total, unit, compactionId);
+ this(null, tasktype, completed, total, unit, compactionId, sstables);
}
- public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, Unit unit, UUID compactionId)
+ private CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, Unit unit, UUID compactionId, Collection<SSTableReader> sstables)
{
this.tasktype = tasktype;
this.completed = completed;
@@ -87,12 +69,22 @@ public final class CompactionInfo implements Serializable
this.metadata = metadata;
this.unit = unit;
this.compactionId = compactionId;
+ this.sstables = ImmutableSet.copyOf(sstables);
+ }
+
+ /**
+ * Special compaction info where we always need to cancel the compaction - for example ViewBuilderTask and AutoSavingCache where we don't know
+ * the sstables at construction
+ */
+ public static CompactionInfo withoutSSTables(TableMetadata metadata, OperationType tasktype, long completed, long total, Unit unit, UUID compactionId)
+ {
+ return new CompactionInfo(metadata, tasktype, completed, total, unit, compactionId, ImmutableSet.of());
}
/** @return A copy of this CompactionInfo with updated progress. */
public CompactionInfo forProgress(long complete, long total)
{
- return new CompactionInfo(metadata, tasktype, complete, total, unit, compactionId);
+ return new CompactionInfo(metadata, tasktype, complete, total, unit, compactionId, sstables);
}
public Optional<String> getKeyspace()
@@ -135,10 +127,16 @@ public final class CompactionInfo implements Serializable
return unit;
}
+ public Set<SSTableReader> getSSTables()
+ {
+ return sstables;
+ }
+
public String toString()
{
StringBuilder buff = new StringBuilder();
buff.append(getTaskType());
+
if (metadata != null)
{
buff.append('@').append(metadata.id).append('(');
@@ -148,8 +146,13 @@ public final class CompactionInfo implements Serializable
{
buff.append('(');
}
- buff.append(getCompleted()).append('/').append(getTotal());
- return buff.append(')').append(unit).toString();
+ buff.append(getCompleted())
+ .append('/')
+ .append(getTotal())
+ .append(')')
+ .append(unit);
+
+ return buff.toString();
}
public Map<String, String> asMap()
@@ -163,9 +166,19 @@ public final class CompactionInfo implements Serializable
ret.put(TASK_TYPE, tasktype.toString());
ret.put(UNIT, unit.toString());
ret.put(COMPACTION_ID, compactionId == null ? "" : compactionId.toString());
+ ret.put(SSTABLES, Joiner.on(',').join(sstables));
return ret;
}
+ boolean shouldStop(Predicate<SSTableReader> sstablePredicate)
+ {
+ if (sstables.isEmpty())
+ {
+ return true;
+ }
+ return sstables.stream().anyMatch(sstablePredicate);
+ }
+
public static abstract class Holder
{
private volatile boolean stopRequested = false;
@@ -181,4 +194,27 @@ public final class CompactionInfo implements Serializable
return stopRequested;
}
}
+
+ public enum Unit
+ {
+ BYTES("bytes"), RANGES("token range parts"), KEYS("keys");
+
+ private final String name;
+
+ Unit(String name)
+ {
+ this.name = name;
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.name;
+ }
+
+ public static boolean isFileSize(String unit)
+ {
+ return BYTES.toString().equals(unit);
+ }
+ }
}
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index c73520d..0aba594 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -20,8 +20,10 @@ package org.apache.cassandra.db.compaction;
import java.util.*;
import java.util.function.LongPredicate;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Ordering;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
@@ -32,7 +34,6 @@ import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.index.transactions.CompactionTransaction;
import org.apache.cassandra.io.sstable.ISSTableScanner;
-import org.apache.cassandra.metrics.CompactionMetrics;
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
/**
@@ -58,6 +59,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
private final OperationType type;
private final CompactionController controller;
private final List<ISSTableScanner> scanners;
+ private final ImmutableSet<SSTableReader> sstables;
private final int nowInSec;
private final UUID compactionId;
@@ -73,20 +75,20 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
private final long[] mergeCounters;
private final UnfilteredPartitionIterator compacted;
- private final CompactionMetrics metrics;
+ private final ActiveCompactionsTracker activeCompactions;
public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId)
{
- this(type, scanners, controller, nowInSec, compactionId, null, true);
+ this(type, scanners, controller, nowInSec, compactionId, ActiveCompactionsTracker.NOOP, true);
}
- public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId, CompactionMetrics metrics)
+ public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId, ActiveCompactionsTracker activeCompactions)
{
- this(type, scanners, controller, nowInSec, compactionId, metrics, true);
+ this(type, scanners, controller, nowInSec, compactionId, activeCompactions, true);
}
@SuppressWarnings("resource") // We make sure to close mergedIterator in close() and CompactionIterator is itself an AutoCloseable
- public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId, CompactionMetrics metrics, boolean abortable)
+ public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId, ActiveCompactionsTracker activeCompactions, boolean abortable)
{
this.controller = controller;
this.type = type;
@@ -100,10 +102,8 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
bytes += scanner.getLengthInBytes();
this.totalBytes = bytes;
this.mergeCounters = new long[scanners.size()];
- this.metrics = metrics;
-
- if (metrics != null)
- metrics.beginCompaction(this);
+ this.activeCompactions = activeCompactions == null ? ActiveCompactionsTracker.NOOP : activeCompactions;
+ this.activeCompactions.beginCompaction(this); // note that CompactionTask also calls this, but CT only creates CompactionIterator with a NOOP ActiveCompactions
UnfilteredPartitionIterator merged = scanners.isEmpty()
? EmptyIterators.unfilteredPartition(controller.cfs.metadata())
@@ -114,6 +114,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
compacted = Transformation.apply(merged, new AbortableUnfilteredPartitionTransformation(this));
else
compacted = merged;
+ sstables = scanners.stream().map(ISSTableScanner::getBackingSSTables).flatMap(Collection::stream).collect(ImmutableSet.toImmutableSet());
}
public TableMetadata metadata()
@@ -127,7 +128,8 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
type,
bytesRead,
totalBytes,
- compactionId);
+ compactionId,
+ sstables);
}
private void updateCounterFor(int rows)
@@ -254,8 +256,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
}
finally
{
- if (metrics != null)
- metrics.finishCompaction(this);
+ activeCompactions.finishCompaction(this);
}
}
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 3eebd75..5998983 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -128,6 +128,8 @@ public class CompactionManager implements CompactionManagerMBean
@VisibleForTesting
final Multiset<ColumnFamilyStore> compactingCF = ConcurrentHashMultiset.create();
+ public final ActiveCompactions active = new ActiveCompactions();
+
private final RateLimiter compactionRateLimiter = RateLimiter.create(Double.MAX_VALUE);
public CompactionMetrics getMetrics()
@@ -203,10 +205,10 @@ public class CompactionManager implements CompactionManagerMBean
return futures;
}
- public boolean isCompacting(Iterable<ColumnFamilyStore> cfses)
+ public boolean isCompacting(Iterable<ColumnFamilyStore> cfses, Predicate<SSTableReader> sstablePredicate)
{
for (ColumnFamilyStore cfs : cfses)
- if (!cfs.getTracker().getCompacting().isEmpty())
+ if (cfs.getTracker().getCompacting().stream().anyMatch(sstablePredicate))
return true;
return false;
}
@@ -224,7 +226,7 @@ public class CompactionManager implements CompactionManagerMBean
cacheCleanupExecutor.shutdown();
// interrupt compactions and validations
- for (Holder compactionHolder : CompactionMetrics.getCompactions())
+ for (Holder compactionHolder : active.getCompactions())
{
compactionHolder.stop();
}
@@ -286,7 +288,7 @@ public class CompactionManager implements CompactionManagerMBean
}
else
{
- task.execute(metrics);
+ task.execute(active);
ranCompaction = true;
}
}
@@ -308,7 +310,7 @@ public class CompactionManager implements CompactionManagerMBean
AbstractCompactionTask upgradeTask = strategy.findUpgradeSSTableTask();
if (upgradeTask != null)
{
- upgradeTask.execute(metrics);
+ upgradeTask.execute(active);
return true;
}
}
@@ -443,7 +445,7 @@ public class CompactionManager implements CompactionManagerMBean
@Override
public void execute(LifecycleTransaction input)
{
- scrubOne(cfs, input, skipCorrupted, checkData, reinsertOverflowedTTL);
+ scrubOne(cfs, input, skipCorrupted, checkData, reinsertOverflowedTTL, active);
}
}, jobs, OperationType.SCRUB);
}
@@ -462,7 +464,7 @@ public class CompactionManager implements CompactionManagerMBean
@Override
public void execute(LifecycleTransaction input)
{
- verifyOne(cfs, input.onlyOne(), options);
+ verifyOne(cfs, input.onlyOne(), options, active);
}
}, 0, OperationType.VERIFY);
}
@@ -495,7 +497,7 @@ public class CompactionManager implements CompactionManagerMBean
AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE);
task.setUserDefined(true);
task.setCompactionType(OperationType.UPGRADE_SSTABLES);
- task.execute(metrics);
+ task.execute(active);
}
}, jobs, OperationType.UPGRADE_SSTABLES);
}
@@ -566,7 +568,7 @@ public class CompactionManager implements CompactionManagerMBean
};
task.setUserDefined(true);
task.setCompactionType(OperationType.GARBAGE_COLLECT);
- task.execute(metrics);
+ task.execute(active);
}
}, jobs, OperationType.GARBAGE_COLLECT);
}
@@ -638,7 +640,7 @@ public class CompactionManager implements CompactionManagerMBean
AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE);
task.setUserDefined(true);
task.setCompactionType(OperationType.RELOCATE);
- task.execute(metrics);
+ task.execute(active);
}
}, jobs, OperationType.RELOCATE);
}
@@ -831,7 +833,7 @@ public class CompactionManager implements CompactionManagerMBean
{
protected void runMayThrow()
{
- task.execute(metrics);
+ task.execute(active);
}
};
@@ -857,7 +859,7 @@ public class CompactionManager implements CompactionManagerMBean
return null;
}
return cfStore.getCompactionStrategyManager().getUserDefinedTasks(sstables, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()));
- }, false, false);
+ }, (sstable) -> new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges), false, false);
if (tasks == null)
return;
@@ -868,7 +870,7 @@ public class CompactionManager implements CompactionManagerMBean
{
for (AbstractCompactionTask task : tasks)
if (task != null)
- task.execute(metrics);
+ task.execute(active);
}
};
@@ -1011,7 +1013,7 @@ public class CompactionManager implements CompactionManagerMBean
for (AbstractCompactionTask task : tasks)
{
if (task != null)
- task.execute(metrics);
+ task.execute(active);
}
}
}
@@ -1047,37 +1049,39 @@ public class CompactionManager implements CompactionManagerMBean
}
}
- private void scrubOne(ColumnFamilyStore cfs, LifecycleTransaction modifier, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTL)
+ @VisibleForTesting
+ void scrubOne(ColumnFamilyStore cfs, LifecycleTransaction modifier, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTL, ActiveCompactionsTracker activeCompactions)
{
CompactionInfo.Holder scrubInfo = null;
try (Scrubber scrubber = new Scrubber(cfs, modifier, skipCorrupted, checkData, reinsertOverflowedTTL))
{
scrubInfo = scrubber.getScrubInfo();
- metrics.beginCompaction(scrubInfo);
+ activeCompactions.beginCompaction(scrubInfo);
scrubber.scrub();
}
finally
{
if (scrubInfo != null)
- metrics.finishCompaction(scrubInfo);
+ activeCompactions.finishCompaction(scrubInfo);
}
}
- private void verifyOne(ColumnFamilyStore cfs, SSTableReader sstable, Verifier.Options options)
+ @VisibleForTesting
+ void verifyOne(ColumnFamilyStore cfs, SSTableReader sstable, Verifier.Options options, ActiveCompactionsTracker activeCompactions)
{
CompactionInfo.Holder verifyInfo = null;
try (Verifier verifier = new Verifier(cfs, sstable, false, options))
{
verifyInfo = verifier.getVerifyInfo();
- metrics.beginCompaction(verifyInfo);
+ activeCompactions.beginCompaction(verifyInfo);
verifier.verify();
}
finally
{
if (verifyInfo != null)
- metrics.finishCompaction(verifyInfo);
+ activeCompactions.finishCompaction(verifyInfo);
}
}
@@ -1198,7 +1202,7 @@ public class CompactionManager implements CompactionManagerMBean
ISSTableScanner scanner = cleanupStrategy.getScanner(sstable);
CompactionController controller = new CompactionController(cfs, txn.originals(), getDefaultGcBefore(cfs, nowInSec));
Refs<SSTableReader> refs = Refs.ref(Collections.singleton(sstable));
- CompactionIterator ci = new CompactionIterator(OperationType.CLEANUP, Collections.singletonList(scanner), controller, nowInSec, UUIDGen.getTimeUUID(), metrics))
+ CompactionIterator ci = new CompactionIterator(OperationType.CLEANUP, Collections.singletonList(scanner), controller, nowInSec, UUIDGen.getTimeUUID(), active))
{
StatsMetadata metadata = sstable.getSSTableMetadata();
writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, metadata.repairedAt, metadata.pendingRepair, metadata.isTransient, sstable, txn));
@@ -1495,7 +1499,7 @@ public class CompactionManager implements CompactionManagerMBean
AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(txn.originals());
CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs, nowInSec));
- CompactionIterator ci = getAntiCompactionIterator(scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID(), metrics))
+ CompactionIterator ci = getAntiCompactionIterator(scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID(), active))
{
int expectedBloomFilterSize = Math.max(cfs.metadata().params.minIndexInterval, (int)(SSTableReader.getApproximateKeyCount(sstableAsSet)));
@@ -1556,28 +1560,26 @@ public class CompactionManager implements CompactionManagerMBean
}
@VisibleForTesting
- public static CompactionIterator getAntiCompactionIterator(List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID timeUUID, CompactionMetrics metrics)
+ public static CompactionIterator getAntiCompactionIterator(List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID timeUUID, ActiveCompactionsTracker activeCompactions)
{
- return new CompactionIterator(OperationType.ANTICOMPACTION, scanners, controller, nowInSec, timeUUID, metrics, false);
+ return new CompactionIterator(OperationType.ANTICOMPACTION, scanners, controller, nowInSec, timeUUID, activeCompactions, false);
}
- /**
- * Is not scheduled, because it is performing disjoint work from sstable compaction.
- */
- public ListenableFuture<?> submitIndexBuild(final SecondaryIndexBuilder builder)
+ @VisibleForTesting
+ ListenableFuture<?> submitIndexBuild(final SecondaryIndexBuilder builder, ActiveCompactionsTracker activeCompactions)
{
Runnable runnable = new Runnable()
{
public void run()
{
- metrics.beginCompaction(builder);
+ activeCompactions.beginCompaction(builder);
try
{
builder.build();
}
finally
{
- metrics.finishCompaction(builder);
+ activeCompactions.finishCompaction(builder);
}
}
};
@@ -1585,8 +1587,21 @@ public class CompactionManager implements CompactionManagerMBean
return executor.submitIfRunning(runnable, "index build");
}
+ /**
+ * Is not scheduled, because it is performing disjoint work from sstable compaction.
+ */
+ public ListenableFuture<?> submitIndexBuild(final SecondaryIndexBuilder builder)
+ {
+ return submitIndexBuild(builder, active);
+ }
+
public Future<?> submitCacheWrite(final AutoSavingCache.Writer writer)
{
+ return submitCacheWrite(writer, active);
+ }
+
+ Future<?> submitCacheWrite(final AutoSavingCache.Writer writer, ActiveCompactionsTracker activeCompactions)
+ {
Runnable runnable = new Runnable()
{
public void run()
@@ -1598,14 +1613,14 @@ public class CompactionManager implements CompactionManagerMBean
}
try
{
- metrics.beginCompaction(writer);
+ activeCompactions.beginCompaction(writer);
try
{
writer.saveCache();
}
finally
{
- metrics.finishCompaction(writer);
+ activeCompactions.finishCompaction(writer);
}
}
finally
@@ -1620,15 +1635,20 @@ public class CompactionManager implements CompactionManagerMBean
public List<SSTableReader> runIndexSummaryRedistribution(IndexSummaryRedistribution redistribution) throws IOException
{
- metrics.beginCompaction(redistribution);
+ return runIndexSummaryRedistribution(redistribution, active);
+ }
+ @VisibleForTesting
+ List<SSTableReader> runIndexSummaryRedistribution(IndexSummaryRedistribution redistribution, ActiveCompactionsTracker activeCompactions) throws IOException
+ {
+ activeCompactions.beginCompaction(redistribution);
try
{
return redistribution.redistributeSummaries();
}
finally
{
- metrics.finishCompaction(redistribution);
+ activeCompactions.finishCompaction(redistribution);
}
}
@@ -1641,22 +1661,28 @@ public class CompactionManager implements CompactionManagerMBean
public ListenableFuture<Long> submitViewBuilder(final ViewBuilderTask task)
{
+ return submitViewBuilder(task, active);
+ }
+
+ @VisibleForTesting
+ ListenableFuture<Long> submitViewBuilder(final ViewBuilderTask task, ActiveCompactionsTracker activeCompactions)
+ {
return viewBuildExecutor.submitIfRunning(() -> {
- metrics.beginCompaction(task);
+ activeCompactions.beginCompaction(task);
try
{
return task.call();
}
finally
{
- metrics.finishCompaction(task);
+ activeCompactions.finishCompaction(task);
}
}, "view build");
}
public int getActiveCompactions()
{
- return CompactionMetrics.getCompactions().size();
+ return active.getCompactions().size();
}
static class CompactionExecutor extends JMXEnabledThreadPoolExecutor
@@ -1779,13 +1805,6 @@ public class CompactionManager implements CompactionManagerMBean
}
}
- public interface CompactionExecutorStatsCollector
- {
- void beginCompaction(CompactionInfo.Holder ci);
-
- void finishCompaction(CompactionInfo.Holder ci);
- }
-
public void incrementAborted()
{
metrics.compactionsAborted.inc();
@@ -1804,7 +1823,7 @@ public class CompactionManager implements CompactionManagerMBean
public List<Map<String, String>> getCompactions()
{
- List<Holder> compactionHolders = CompactionMetrics.getCompactions();
+ List<Holder> compactionHolders = active.getCompactions();
List<Map<String, String>> out = new ArrayList<Map<String, String>>(compactionHolders.size());
for (CompactionInfo.Holder ci : compactionHolders)
out.add(ci.getCompactionInfo().asMap());
@@ -1813,7 +1832,7 @@ public class CompactionManager implements CompactionManagerMBean
public List<String> getCompactionSummary()
{
- List<Holder> compactionHolders = CompactionMetrics.getCompactions();
+ List<Holder> compactionHolders = active.getCompactions();
List<String> out = new ArrayList<String>(compactionHolders.size());
for (CompactionInfo.Holder ci : compactionHolders)
out.add(ci.getCompactionInfo().toString());
@@ -1855,7 +1874,7 @@ public class CompactionManager implements CompactionManagerMBean
public void stopCompaction(String type)
{
OperationType operation = OperationType.valueOf(type);
- for (Holder holder : CompactionMetrics.getCompactions())
+ for (Holder holder : active.getCompactions())
{
if (holder.getCompactionInfo().getTaskType() == operation)
holder.stop();
@@ -1864,7 +1883,7 @@ public class CompactionManager implements CompactionManagerMBean
public void stopCompactionById(String compactionId)
{
- for (Holder holder : CompactionMetrics.getCompactions())
+ for (Holder holder : active.getCompactions())
{
UUID holderId = holder.getCompactionInfo().getTaskId();
if (holderId != null && holderId.equals(UUID.fromString(compactionId)))
@@ -2004,41 +2023,45 @@ public class CompactionManager implements CompactionManagerMBean
* isCompacting if you want that behavior.
*
* @param columnFamilies The ColumnFamilies to try to stop compaction upon.
+ * @param sstablePredicate the sstable predicate to match on
* @param interruptValidation true if validation operations for repair should also be interrupted
- *
*/
- public void interruptCompactionFor(Iterable<TableMetadata> columnFamilies, boolean interruptValidation)
+ public void interruptCompactionFor(Iterable<TableMetadata> columnFamilies, Predicate<SSTableReader> sstablePredicate, boolean interruptValidation)
{
assert columnFamilies != null;
// interrupt in-progress compactions
- for (Holder compactionHolder : CompactionMetrics.getCompactions())
+ for (Holder compactionHolder : active.getCompactions())
{
CompactionInfo info = compactionHolder.getCompactionInfo();
if ((info.getTaskType() == OperationType.VALIDATION) && !interruptValidation)
continue;
if (Iterables.contains(columnFamilies, info.getTableMetadata()))
- compactionHolder.stop(); // signal compaction to stop
+ {
+ if (info.shouldStop(sstablePredicate))
+ compactionHolder.stop();
+ }
}
}
- public void interruptCompactionForCFs(Iterable<ColumnFamilyStore> cfss, boolean interruptValidation)
+ public void interruptCompactionForCFs(Iterable<ColumnFamilyStore> cfss, Predicate<SSTableReader> sstablePredicate, boolean interruptValidation)
{
List<TableMetadata> metadata = new ArrayList<>();
for (ColumnFamilyStore cfs : cfss)
metadata.add(cfs.metadata());
- interruptCompactionFor(metadata, interruptValidation);
+ interruptCompactionFor(metadata, sstablePredicate, interruptValidation);
}
- public void waitForCessation(Iterable<ColumnFamilyStore> cfss)
+ public void waitForCessation(Iterable<ColumnFamilyStore> cfss, Predicate<SSTableReader> sstablePredicate)
{
long start = System.nanoTime();
long delay = TimeUnit.MINUTES.toNanos(1);
+
while (System.nanoTime() - start < delay)
{
- if (CompactionManager.instance.isCompacting(cfss))
+ if (CompactionManager.instance.isCompacting(cfss, sstablePredicate))
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
else
break;
@@ -2047,12 +2070,12 @@ public class CompactionManager implements CompactionManagerMBean
public List<CompactionInfo> getSSTableTasks()
{
- return CompactionMetrics.getCompactions()
- .stream()
- .map(CompactionInfo.Holder::getCompactionInfo)
- .filter(task -> task.getTaskType() != OperationType.COUNTER_CACHE_SAVE
- && task.getTaskType() != OperationType.KEY_CACHE_SAVE
- && task.getTaskType() != OperationType.ROW_CACHE_SAVE)
- .collect(Collectors.toList());
+ return active.getCompactions()
+ .stream()
+ .map(CompactionInfo.Holder::getCompactionInfo)
+ .filter(task -> task.getTaskType() != OperationType.COUNTER_CACHE_SAVE
+ && task.getTaskType() != OperationType.KEY_CACHE_SAVE
+ && task.getTaskType() != OperationType.ROW_CACHE_SAVE)
+ .collect(Collectors.toList());
}
}
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 591b7c4..685da2e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -37,7 +37,6 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -53,7 +52,7 @@ public class CompactionTask extends AbstractCompactionTask
protected final int gcBefore;
protected final boolean keepOriginals;
protected static long totalBytesCompacted = 0;
- private CompactionExecutorStatsCollector collector;
+ private ActiveCompactionsTracker activeCompactions;
public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int gcBefore)
{
@@ -78,9 +77,9 @@ public class CompactionTask extends AbstractCompactionTask
return totalBytesCompacted += bytesCompacted;
}
- protected int executeInternal(CompactionExecutorStatsCollector collector)
+ protected int executeInternal(ActiveCompactionsTracker activeCompactions)
{
- this.collector = collector;
+ this.activeCompactions = activeCompactions == null ? ActiveCompactionsTracker.NOOP : activeCompactions;
run();
return transaction.originals().size();
}
@@ -188,8 +187,7 @@ public class CompactionTask extends AbstractCompactionTask
if (!controller.cfs.getCompactionStrategyManager().isActive())
throw new CompactionInterruptedException(ci.getCompactionInfo());
- if (collector != null)
- collector.beginCompaction(ci);
+ activeCompactions.beginCompaction(ci);
try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, getDirectories(), transaction, actuallyCompact))
{
@@ -219,11 +217,8 @@ public class CompactionTask extends AbstractCompactionTask
}
finally
{
- if (collector != null)
- collector.finishCompaction(ci);
-
+ activeCompactions.finishCompaction(ci);
mergedRowCounts = ci.getMergedRowCounts();
-
totalSourceCQLRows = ci.getTotalSourceCQLRows();
}
}
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 3437094..74ffccb 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -493,9 +493,9 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
return currentScanner == null ? totalBytesScanned : totalBytesScanned + currentScanner.getBytesScanned();
}
- public String getBackingFiles()
+ public Set<SSTableReader> getBackingSSTables()
{
- return Joiner.on(", ").join(sstables);
+ return ImmutableSet.copyOf(sstables);
}
}
diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
index 1aa156f..b2d70f7 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
@@ -479,7 +479,7 @@ class PendingRepairManager
throw new UnsupportedOperationException();
}
- protected int executeInternal(CompactionManager.CompactionExecutorStatsCollector collector)
+ protected int executeInternal(ActiveCompactionsTracker activeCompactions)
{
run();
return transaction.originals().size();
diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
index e9ae429..1746d7c 100644
--- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@ -31,8 +31,6 @@ public class SSTableSplitter
{
private final SplittingCompactionTask task;
- private CompactionInfo.Holder info;
-
public SSTableSplitter(ColumnFamilyStore cfs, LifecycleTransaction transaction, int sstableSizeInMB)
{
this.task = new SplittingCompactionTask(cfs, transaction, sstableSizeInMB);
@@ -40,20 +38,7 @@ public class SSTableSplitter
public void split()
{
- task.execute(new StatsCollector());
- }
-
- public class StatsCollector implements CompactionManager.CompactionExecutorStatsCollector
- {
- public void beginCompaction(CompactionInfo.Holder ci)
- {
- SSTableSplitter.this.info = ci;
- }
-
- public void finishCompaction(CompactionInfo.Holder ci)
- {
- // no-op
- }
+ task.execute(ActiveCompactionsTracker.NOOP);
}
public static class SplittingCompactionTask extends CompactionTask
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index aa41051..63c89e8 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -23,6 +23,7 @@ import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableSet;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
@@ -475,7 +476,8 @@ public class Scrubber implements Closeable
OperationType.SCRUB,
dataFile.getFilePointer(),
dataFile.length(),
- scrubCompactionId);
+ scrubCompactionId,
+ ImmutableSet.of(sstable));
}
catch (Exception e)
{
diff --git a/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java b/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java
index 3522d61..02a1c49 100644
--- a/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java
@@ -57,7 +57,7 @@ public class SingleSSTableLCSTask extends AbstractCompactionTask
}
@Override
- protected int executeInternal(CompactionManager.CompactionExecutorStatsCollector collector)
+ protected int executeInternal(ActiveCompactionsTracker activeCompactions)
{
run();
return 1;
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index 446d527..161620a 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableSet;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -500,7 +501,8 @@ public class Verifier implements Closeable
OperationType.VERIFY,
dataFile.getFilePointer(),
dataFile.length(),
- verificationCompactionId);
+ verificationCompactionId,
+ ImmutableSet.of(sstable));
}
catch (Exception e)
{
diff --git a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
index caf1b8e..d653f6c 100644
--- a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
+++ b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
@@ -39,6 +39,8 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+import org.apache.cassandra.db.compaction.ActiveCompactions;
+import org.apache.cassandra.db.compaction.ActiveCompactionsTracker;
import org.apache.cassandra.db.compaction.CompactionController;
import org.apache.cassandra.db.compaction.CompactionInterruptedException;
import org.apache.cassandra.db.compaction.CompactionIterator;
@@ -107,9 +109,9 @@ public class CassandraValidationIterator extends ValidationPartitionIterator
private static class ValidationCompactionIterator extends CompactionIterator
{
- public ValidationCompactionIterator(List<ISSTableScanner> scanners, ValidationCompactionController controller, int nowInSec, CompactionMetrics metrics)
+ public ValidationCompactionIterator(List<ISSTableScanner> scanners, ValidationCompactionController controller, int nowInSec, ActiveCompactionsTracker activeCompactions)
{
- super(OperationType.VALIDATION, scanners, controller, nowInSec, UUIDGen.getTimeUUID(), metrics);
+ super(OperationType.VALIDATION, scanners, controller, nowInSec, UUIDGen.getTimeUUID(), activeCompactions);
}
}
@@ -226,7 +228,7 @@ public class CassandraValidationIterator extends ValidationPartitionIterator
Preconditions.checkArgument(sstables != null);
controller = new ValidationCompactionController(cfs, getDefaultGcBefore(cfs, nowInSec));
scanners = cfs.getCompactionStrategyManager().getScanners(sstables, ranges);
- ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, CompactionManager.instance.getMetrics());
+ ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, CompactionManager.instance.active);
long allPartitions = 0;
rangePartitionCounts = Maps.newHashMapWithExpectedSize(ranges.size());
diff --git a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
index 1bc2fce..6040ea8 100644
--- a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
+++ b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db.repair;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
@@ -29,8 +28,8 @@ import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -41,6 +40,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.CompactionInfo;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -86,97 +86,116 @@ public class PendingAntiCompaction
}
}
- static class SSTableAcquisitionException extends RuntimeException {}
+ static class SSTableAcquisitionException extends RuntimeException
+ {
+ SSTableAcquisitionException(String message)
+ {
+ super(message);
+ }
+ }
- static class AcquisitionCallable implements Callable<AcquireResult>
+ @VisibleForTesting
+ static class AntiCompactionPredicate implements Predicate<SSTableReader>
{
- private final ColumnFamilyStore cfs;
private final Collection<Range<Token>> ranges;
- private final UUID sessionID;
+ private final UUID prsid;
- public AcquisitionCallable(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, UUID sessionID)
+ public AntiCompactionPredicate(Collection<Range<Token>> ranges, UUID prsid)
{
- this.cfs = cfs;
this.ranges = ranges;
- this.sessionID = sessionID;
+ this.prsid = prsid;
}
- private Iterable<SSTableReader> getSSTables()
+ public boolean apply(SSTableReader sstable)
{
- Set<UUID> conflictingSessions = new HashSet<>();
+ if (!sstable.intersects(ranges))
+ return false;
- Iterable<SSTableReader> sstables = cfs.getLiveSSTables().stream().filter(sstable -> {
- if (!sstable.intersects(ranges))
- return false;
+ StatsMetadata metadata = sstable.getSSTableMetadata();
- StatsMetadata metadata = sstable.getSSTableMetadata();
-
- // exclude repaired sstables
- if (metadata.repairedAt != UNREPAIRED_SSTABLE)
- return false;
+ // exclude repaired sstables
+ if (metadata.repairedAt != UNREPAIRED_SSTABLE)
+ return false;
- // exclude sstables pending repair, but record session ids for
- // non-finalized sessions for a later error message
- if (metadata.pendingRepair != NO_PENDING_REPAIR)
+ // exclude sstables pending repair, but record session ids for
+ // non-finalized sessions for a later error message
+ if (metadata.pendingRepair != NO_PENDING_REPAIR)
+ {
+ if (!ActiveRepairService.instance.consistent.local.isSessionFinalized(metadata.pendingRepair))
{
- if (!ActiveRepairService.instance.consistent.local.isSessionFinalized(metadata.pendingRepair))
- {
- conflictingSessions.add(metadata.pendingRepair);
- }
- return false;
+ String message = String.format("Prepare phase for incremental repair session %s has failed because it encountered " +
+ "intersecting sstables belonging to another incremental repair session (%s). This is " +
+ "caused by starting an incremental repair session before a previous one has completed. " +
+ "Check nodetool repair_admin for hung sessions and fix them.", prsid, metadata.pendingRepair);
+ throw new SSTableAcquisitionException(message);
}
-
- return true;
- }).collect(Collectors.toList());
-
- // If there are sstables we'd like to acquire that are currently held by other sessions, we need to bail out. If we
- // didn't bail out here and the other repair sessions we're seeing were to fail, incremental repair behavior would be
- // confusing. You generally expect all data received before a repair session to be repaired when the session completes,
- // and that wouldn't be the case if the other session failed and moved it's data back to unrepaired.
- if (!conflictingSessions.isEmpty())
+ return false;
+ }
+ CompactionInfo ci = CompactionManager.instance.active.getCompactionForSSTable(sstable);
+ if (ci != null && ci.getTaskType() == OperationType.ANTICOMPACTION)
{
- logger.warn("Prepare phase for incremental repair session {} has failed because it encountered " +
- "intersecting sstables belonging to another incremental repair session(s) ({}). This is " +
- "caused by starting an incremental repair session before a previous one has completed. " +
- "Check nodetool repair_admin for hung sessions and fix them.",
- sessionID, conflictingSessions);
- throw new SSTableAcquisitionException();
+ // todo: start tracking the parent repair session id that created the anticompaction to be able to give a better error messsage here:
+ String message = String.format("Prepare phase for incremental repair session %s has failed because it encountered " +
+ "intersecting sstables belonging to another incremental repair session. This is " +
+ "caused by starting multiple conflicting incremental repairs at the same time", prsid);
+ throw new SSTableAcquisitionException(message);
}
+ return true;
+ }
+ }
+
+ static class AcquisitionCallable implements Callable<AcquireResult>
+ {
+ private final ColumnFamilyStore cfs;
+ private final UUID sessionID;
+ private final AntiCompactionPredicate predicate;
- return sstables;
+ public AcquisitionCallable(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, UUID sessionID)
+ {
+ this.cfs = cfs;
+ this.sessionID = sessionID;
+ predicate = new AntiCompactionPredicate(ranges, sessionID);
}
@SuppressWarnings("resource")
private AcquireResult acquireTuple()
{
- List<SSTableReader> sstables = Lists.newArrayList(getSSTables());
- if (sstables.isEmpty())
- return new AcquireResult(cfs, null, null);
+ // this method runs with compactions stopped & disabled
+ try
+ {
+ // using predicate might throw if there are conflicting ranges
+ Set<SSTableReader> sstables = cfs.getLiveSSTables().stream().filter(predicate).collect(Collectors.toSet());
+ if (sstables.isEmpty())
+ return new AcquireResult(cfs, null, null);
+
+ LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+ if (txn != null)
+ return new AcquireResult(cfs, Refs.ref(sstables), txn);
+ }
+ catch (SSTableAcquisitionException e)
+ {
+ logger.warn(e.getMessage());
+ logger.debug("Got exception trying to acquire sstables", e);
+ }
- LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
- if (txn != null)
- return new AcquireResult(cfs, Refs.ref(sstables), txn);
- else
- return null;
+ return null;
}
- public AcquireResult call() throws Exception
+ public AcquireResult call()
{
logger.debug("acquiring sstables for pending anti compaction on session {}", sessionID);
+ // try to modify after cancelling running compactions. This will attempt to cancel in flight compactions including the given sstables for
+ // up to a minute, after which point, null will be returned
try
{
- AcquireResult refTxn = acquireTuple();
- if (refTxn != null)
- return refTxn;
+ return cfs.runWithCompactionsDisabled(this::acquireTuple, predicate, false, false);
}
catch (SSTableAcquisitionException e)
{
- return null;
+ logger.warn(e.getMessage());
+ logger.debug("Got exception trying to acquire sstables", e);
}
-
- // try to modify after cancelling running compactions. This will attempt to cancel in flight compactions for
- // up to a minute, after which point, null will be returned
- return cfs.runWithCompactionsDisabled(this::acquireTuple, false, false);
+ return null;
}
}
@@ -223,11 +242,12 @@ public class PendingAntiCompaction
result.abort();
}
}
- logger.warn("Prepare phase for incremental repair session {} was unable to " +
- "acquire exclusive access to the neccesary sstables. " +
- "This is usually caused by running multiple incremental repairs on nodes that share token ranges",
- parentRepairSession);
- return Futures.immediateFailedFuture(new SSTableAcquisitionException());
+ String message = String.format("Prepare phase for incremental repair session %s was unable to " +
+ "acquire exclusive access to the neccesary sstables. " +
+ "This is usually caused by running multiple incremental repairs on nodes that share token ranges",
+ parentRepairSession);
+ logger.warn(message);
+ return Futures.immediateFailedFuture(new SSTableAcquisitionException(message));
}
else
{
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java b/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
index db78a54..d041b48 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
@@ -26,8 +26,10 @@ import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import com.google.common.util.concurrent.Futures;
@@ -76,7 +78,8 @@ public class ViewBuilderTask extends CompactionInfo.Holder implements Callable<L
private volatile boolean isStopped = false;
private volatile boolean isCompactionInterrupted = false;
- ViewBuilderTask(ColumnFamilyStore baseCfs, View view, Range<Token> range, Token lastToken, long keysBuilt)
+ @VisibleForTesting
+ public ViewBuilderTask(ColumnFamilyStore baseCfs, View view, Range<Token> range, Token lastToken, long keysBuilt)
{
this.baseCfs = baseCfs;
this.view = view;
@@ -195,17 +198,20 @@ public class ViewBuilderTask extends CompactionInfo.Holder implements Callable<L
@Override
public CompactionInfo getCompactionInfo()
{
+ // we don't know the sstables at construction of ViewBuilderTask and we could change this to return once we know the
+ // but since we basically only cancel view builds on truncation where we cancel all compactions anyway, this seems reasonable
+
// If there's splitter, calculate progress based on last token position
if (range.left.getPartitioner().splitter().isPresent())
{
long progress = prevToken == null ? 0 : Math.round(prevToken.getPartitioner().splitter().get().positionInRange(prevToken, range) * 1000);
- return new CompactionInfo(baseCfs.metadata(), OperationType.VIEW_BUILD, progress, 1000, Unit.RANGES, compactionId);
+ return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, progress, 1000, Unit.RANGES, compactionId);
}
// When there is no splitter, estimate based on number of total keys but
// take the max with keysBuilt + 1 to avoid having more completed than total
long keysTotal = Math.max(keysBuilt + 1, baseCfs.estimatedKeysForRange(range));
- return new CompactionInfo(baseCfs.metadata(), OperationType.VIEW_BUILD, keysBuilt, keysTotal, Unit.KEYS, compactionId);
+ return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, keysBuilt, keysTotal, Unit.KEYS, compactionId);
}
@Override
diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java
index 7019038..b9f38ce 100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@ -159,7 +159,7 @@ public interface Index
@SuppressWarnings("resource")
public SecondaryIndexBuilder getIndexBuildTask(ColumnFamilyStore cfs, Set<Index> indexes, Collection<SSTableReader> sstables)
{
- return new CollatedViewIndexBuilder(cfs, indexes, new ReducingKeyIterator(sstables));
+ return new CollatedViewIndexBuilder(cfs, indexes, new ReducingKeyIterator(sstables), sstables);
}
}
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index abe2460..ecd25fd 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -660,8 +660,8 @@ public abstract class CassandraIndex implements Index
{
// interrupt in-progress compactions
Collection<ColumnFamilyStore> cfss = Collections.singleton(indexCfs);
- CompactionManager.instance.interruptCompactionForCFs(cfss, true);
- CompactionManager.instance.waitForCessation(cfss);
+ CompactionManager.instance.interruptCompactionForCFs(cfss, (sstable) -> true, true);
+ CompactionManager.instance.waitForCessation(cfss, (sstable) -> true);
Keyspace.writeOrder.awaitNewBarrier();
indexCfs.forceBlockingFlush();
indexCfs.readOrdering.awaitNewBarrier();
@@ -709,7 +709,8 @@ public abstract class CassandraIndex implements Index
SecondaryIndexBuilder builder = new CollatedViewIndexBuilder(baseCfs,
Collections.singleton(this),
- new ReducingKeyIterator(sstables));
+ new ReducingKeyIterator(sstables),
+ ImmutableSet.copyOf(sstables));
Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
FBUtilities.waitOnFuture(future);
indexCfs.forceBlockingFlush();
diff --git a/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java b/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java
index 076346a..3c005c4 100644
--- a/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.index.internal;
+import java.util.Collection;
import java.util.Set;
import java.util.UUID;
@@ -28,6 +29,7 @@ import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.SecondaryIndexBuilder;
import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.UUIDGen;
/**
@@ -39,13 +41,15 @@ public class CollatedViewIndexBuilder extends SecondaryIndexBuilder
private final Set<Index> indexers;
private final ReducingKeyIterator iter;
private final UUID compactionId;
+ private final Collection<SSTableReader> sstables;
- public CollatedViewIndexBuilder(ColumnFamilyStore cfs, Set<Index> indexers, ReducingKeyIterator iter)
+ public CollatedViewIndexBuilder(ColumnFamilyStore cfs, Set<Index> indexers, ReducingKeyIterator iter, Collection<SSTableReader> sstables)
{
this.cfs = cfs;
this.indexers = indexers;
this.iter = iter;
this.compactionId = UUIDGen.getTimeUUID();
+ this.sstables = sstables;
}
public CompactionInfo getCompactionInfo()
@@ -54,7 +58,8 @@ public class CollatedViewIndexBuilder extends SecondaryIndexBuilder
OperationType.INDEX_BUILD,
iter.getBytesRead(),
iter.getTotalBytes(),
- compactionId);
+ compactionId,
+ sstables);
}
public void build()
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
index a01e45b..bb42dc2 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
@@ -127,7 +127,8 @@ class SASIIndexBuilder extends SecondaryIndexBuilder
OperationType.INDEX_BUILD,
bytesProcessed,
totalSizeInBytes,
- compactionId);
+ compactionId,
+ sstables.keySet());
}
private long getPrimaryIndexLength(SSTable sstable)
diff --git a/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
index 1c1d74b..af661b7 100644
--- a/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
@@ -20,10 +20,12 @@
package org.apache.cassandra.io.sstable;
import java.util.Collection;
+import java.util.Set;
import com.google.common.base.Throwables;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.JVMStabilityInspector;
/**
@@ -36,7 +38,7 @@ public interface ISSTableScanner extends UnfilteredPartitionIterator
public long getCompressedLengthInBytes();
public long getCurrentPosition();
public long getBytesScanned();
- public String getBackingFiles();
+ public Set<SSTableReader> getBackingSSTables();
public static void closeAllAndPropagate(Collection<ISSTableScanner> scanners, Throwable throwable)
{
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
index 0e1d0f0..b4fca41 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@ -301,7 +301,7 @@ public class IndexSummaryRedistribution extends CompactionInfo.Holder
public CompactionInfo getCompactionInfo()
{
- return new CompactionInfo(OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), memoryPoolBytes, Unit.BYTES, compactionId);
+ return new CompactionInfo(OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), memoryPoolBytes, Unit.BYTES, compactionId, compacting);
}
/** Utility class for sorting sstables by their read rates. */
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index cb25c6b..5097d96 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@ -23,6 +23,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.AbstractIterator;
+
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import org.apache.cassandra.db.*;
@@ -241,9 +243,9 @@ public class BigTableScanner implements ISSTableScanner
return sstable.onDiskLength();
}
- public String getBackingFiles()
+ public Set<SSTableReader> getBackingSSTables()
{
- return sstable.toString();
+ return ImmutableSet.of(sstable);
}
@@ -420,9 +422,9 @@ public class BigTableScanner implements ISSTableScanner
return 0;
}
- public String getBackingFiles()
+ public Set<SSTableReader> getBackingSSTables()
{
- return sstable.getFilename();
+ return ImmutableSet.of(sstable);
}
public TableMetadata metadata()
diff --git a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
index d54090b..46e5940 100644
--- a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
@@ -26,8 +26,10 @@ import com.codahale.metrics.Meter;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.ActiveCompactions;
import org.apache.cassandra.db.compaction.CompactionInfo;
import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
@@ -36,13 +38,10 @@ import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
/**
* Metrics for compaction.
*/
-public class CompactionMetrics implements CompactionManager.CompactionExecutorStatsCollector
+public class CompactionMetrics
{
public static final MetricNameFactory factory = new DefaultNameFactory("Compaction");
- // a synchronized identity set of running tasks to their compaction info
- private static final Set<CompactionInfo.Holder> compactions = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<CompactionInfo.Holder, Boolean>()));
-
/** Estimated number of compactions remaining to perform */
public final Gauge<Integer> pendingTasks;
/** Estimated number of compactions remaining to perform, group by keyspace and then table name */
@@ -79,7 +78,7 @@ public class CompactionMetrics implements CompactionManager.CompactionExecutorSt
n += cfs.getCompactionStrategyManager().getEstimatedRemainingTasks();
}
// add number of currently running compactions
- return n + compactions.size();
+ return n + CompactionManager.instance.active.getCompactions().size();
}
});
@@ -108,7 +107,7 @@ public class CompactionMetrics implements CompactionManager.CompactionExecutorSt
}
// currently running compactions
- for (CompactionInfo.Holder compaction : compactions)
+ for (CompactionInfo.Holder compaction : CompactionManager.instance.active.getCompactions())
{
TableMetadata metaData = compaction.getCompactionInfo().getTableMetadata();
if (metaData == null)
@@ -153,21 +152,4 @@ public class CompactionMetrics implements CompactionManager.CompactionExecutorSt
sstablesDropppedFromCompactions = Metrics.counter(factory.createMetricName("SSTablesDroppedFromCompaction"));
compactionsAborted = Metrics.counter(factory.createMetricName("CompactionsAborted"));
}
-
- public void beginCompaction(CompactionInfo.Holder ci)
- {
- compactions.add(ci);
- }
-
- public void finishCompaction(CompactionInfo.Holder ci)
- {
- compactions.remove(ci);
- bytesCompacted.inc(ci.getCompactionInfo().getTotal());
- totalCompactionsCompleted.mark();
- }
-
- public static List<CompactionInfo.Holder> getCompactions()
- {
- return new ArrayList<CompactionInfo.Holder>(compactions);
- }
}
diff --git a/src/java/org/apache/cassandra/schema/Schema.java b/src/java/org/apache/cassandra/schema/Schema.java
index 970d9ac..2c2d444 100644
--- a/src/java/org/apache/cassandra/schema/Schema.java
+++ b/src/java/org/apache/cassandra/schema/Schema.java
@@ -759,7 +759,7 @@ public final class Schema
assert cfs != null;
// make sure all the indexes are dropped, or else.
cfs.indexManager.markAllIndexesRemoved();
- CompactionManager.instance.interruptCompactionFor(Collections.singleton(metadata), true);
+ CompactionManager.instance.interruptCompactionFor(Collections.singleton(metadata), (sstable) -> true, true);
if (DatabaseDescriptor.isAutoSnapshot())
cfs.snapshot(Keyspace.getTimestampedSnapshotNameWithPrefix(cfs.name, ColumnFamilyStore.SNAPSHOT_DROP_PREFIX));
CommitLog.instance.forceRecycleAllSegments(Collections.singleton(metadata.id));
diff --git a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
index 912b03f..fe8cdc2 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
@@ -127,7 +127,7 @@ public class LongCompactionsTest
try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.COMPACTION))
{
assert txn != null : "Cannot markCompacting all sstables";
- new CompactionTask(store, txn, gcBefore).execute(null);
+ new CompactionTask(store, txn, gcBefore).execute(ActiveCompactionsTracker.NOOP);
}
System.out.println(String.format("%s: sstables=%d rowsper=%d colsper=%d: %d ms",
this.getClass().getName(),
diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
index 3bcd9d1..f8f94a0 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
@@ -97,7 +97,7 @@ public class LongLeveledCompactionStrategyTest
{
public void run()
{
- nextTask.execute(null);
+ nextTask.execute(ActiveCompactionsTracker.NOOP);
}
});
}
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index d413fd7..3054be6 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -40,6 +40,7 @@ import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.db.compaction.ActiveCompactionsTracker;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.ReplicaCollection;
import org.apache.cassandra.schema.ColumnMetadata;
@@ -245,7 +246,7 @@ public class Util
int gcBefore = cfs.gcBefore(FBUtilities.nowInSeconds());
List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore);
for (AbstractCompactionTask task : tasks)
- task.execute(null);
+ task.execute(ActiveCompactionsTracker.NOOP);
}
public static void expectEOF(Callable<?> callable)
diff --git a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
index 4d62894..431fb32 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
@@ -114,7 +114,7 @@ public class AbstractPendingRepairTest extends AbstractRepairTest
return sstable;
}
- protected static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair, boolean isTransient)
+ public static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair, boolean isTransient)
{
try
{
@@ -127,12 +127,12 @@ public class AbstractPendingRepairTest extends AbstractRepairTest
}
}
- protected static void mutateRepaired(SSTableReader sstable, long repairedAt)
+ public static void mutateRepaired(SSTableReader sstable, long repairedAt)
{
mutateRepaired(sstable, repairedAt, ActiveRepairService.NO_PENDING_REPAIR, false);
}
- protected static void mutateRepaired(SSTableReader sstable, UUID pendingRepair, boolean isTransient)
+ public static void mutateRepaired(SSTableReader sstable, UUID pendingRepair, boolean isTransient)
{
mutateRepaired(sstable, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair, isTransient);
}
diff --git a/test/unit/org/apache/cassandra/db/compaction/ActiveCompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/ActiveCompactionsTest.java
new file mode 100644
index 0000000..23e393d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/ActiveCompactionsTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import org.junit.Test;
+
+import org.apache.cassandra.cache.AutoSavingCache;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.view.View;
+import org.apache.cassandra.db.view.ViewBuilderTask;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.index.SecondaryIndexBuilder;
+import org.apache.cassandra.io.sstable.IndexSummaryRedistribution;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.CacheService;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class ActiveCompactionsTest extends CQLTester
+{
+ @Test
+ public void testSecondaryIndexTracking() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))");
+ String idxName = createIndex("CREATE INDEX on %s(a)");
+ getCurrentColumnFamilyStore().disableAutoCompaction();
+ for (int i = 0; i < 5; i++)
+ {
+ execute("INSERT INTO %s (pk, ck, a, b) VALUES ("+i+", 2, 3, 4)");
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ }
+
+ Index idx = getCurrentColumnFamilyStore().indexManager.getIndexByName(idxName);
+ Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables();
+ SecondaryIndexBuilder builder = idx.getBuildTaskSupport().getIndexBuildTask(getCurrentColumnFamilyStore(), Collections.singleton(idx), sstables);
+
+ MockActiveCompactions mockActiveCompactions = new MockActiveCompactions();
+ CompactionManager.instance.submitIndexBuild(builder, mockActiveCompactions).get();
+
+ assertTrue(mockActiveCompactions.finished);
+ assertNotNull(mockActiveCompactions.holder);
+ assertEquals(sstables, mockActiveCompactions.holder.getCompactionInfo().getSSTables());
+ }
+
+ @Test
+ public void testIndexSummaryRedistributionTracking() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))");
+ getCurrentColumnFamilyStore().disableAutoCompaction();
+ for (int i = 0; i < 5; i++)
+ {
+ execute("INSERT INTO %s (pk, ck, a, b) VALUES ("+i+", 2, 3, 4)");
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ }
+ Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables();
+ try (LifecycleTransaction txn = getCurrentColumnFamilyStore().getTracker().tryModify(sstables, OperationType.INDEX_SUMMARY))
+ {
+ Map<TableId, LifecycleTransaction> transactions = ImmutableMap.<TableId, LifecycleTransaction>builder().put(getCurrentColumnFamilyStore().metadata().id, txn).build();
+ IndexSummaryRedistribution isr = new IndexSummaryRedistribution(new ArrayList<>(sstables), transactions, 1000);
+ MockActiveCompactions mockActiveCompactions = new MockActiveCompactions();
+ CompactionManager.instance.runIndexSummaryRedistribution(isr, mockActiveCompactions);
+ assertTrue(mockActiveCompactions.finished);
+ assertNotNull(mockActiveCompactions.holder);
+ assertEquals(sstables, mockActiveCompactions.holder.getCompactionInfo().getSSTables());
+ }
+ }
+
+ @Test
+ public void testViewBuildTracking() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k1 int, c1 int , val int, PRIMARY KEY (k1, c1))");
+ getCurrentColumnFamilyStore().disableAutoCompaction();
+ for (int i = 0; i < 5; i++)
+ {
+ execute("INSERT INTO %s (k1, c1, val) VALUES ("+i+", 2, 3)");
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ }
+ execute(String.format("CREATE MATERIALIZED VIEW %s.view1 AS SELECT k1, c1, val FROM %s.%s WHERE k1 IS NOT NULL AND c1 IS NOT NULL AND val IS NOT NULL PRIMARY KEY (val, k1, c1)", keyspace(), keyspace(), currentTable()));
+ View view = Iterables.getOnlyElement(getCurrentColumnFamilyStore().viewManager);
+
+ Token token = DatabaseDescriptor.getPartitioner().getMinimumToken();
+ ViewBuilderTask vbt = new ViewBuilderTask(getCurrentColumnFamilyStore(), view, new Range<>(token, token), token, 0);
+
+ MockActiveCompactions mockActiveCompactions = new MockActiveCompactions();
+ CompactionManager.instance.submitViewBuilder(vbt, mockActiveCompactions).get();
+ assertTrue(mockActiveCompactions.finished);
+ assertTrue(mockActiveCompactions.holder.getCompactionInfo().getSSTables().isEmpty());
+ // this should stop for all compactions, even if it doesn't pick any sstables;
+ assertTrue(mockActiveCompactions.holder.getCompactionInfo().shouldStop((sstable) -> false));
+ }
+
+ @Test
+ public void testScrubOne() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))");
+ getCurrentColumnFamilyStore().disableAutoCompaction();
+ for (int i = 0; i < 5; i++)
+ {
+ execute("INSERT INTO %s (pk, ck, a, b) VALUES (" + i + ", 2, 3, 4)");
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ }
+
+ SSTableReader sstable = Iterables.getFirst(getCurrentColumnFamilyStore().getLiveSSTables(), null);
+ try (LifecycleTransaction txn = getCurrentColumnFamilyStore().getTracker().tryModify(sstable, OperationType.SCRUB))
+ {
+ MockActiveCompactions mockActiveCompactions = new MockActiveCompactions();
+ CompactionManager.instance.scrubOne(getCurrentColumnFamilyStore(), txn, true, false, false, mockActiveCompactions);
+
+ assertTrue(mockActiveCompactions.finished);
+ assertEquals(mockActiveCompactions.holder.getCompactionInfo().getSSTables(), Sets.newHashSet(sstable));
+ assertFalse(mockActiveCompactions.holder.getCompactionInfo().shouldStop((s) -> false));
+ assertTrue(mockActiveCompactions.holder.getCompactionInfo().shouldStop((s) -> true));
+ }
+
+ }
+
+ @Test
+ public void testVerifyOne() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))");
+ getCurrentColumnFamilyStore().disableAutoCompaction();
+ for (int i = 0; i < 5; i++)
+ {
+ execute("INSERT INTO %s (pk, ck, a, b) VALUES (" + i + ", 2, 3, 4)");
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ }
+
+ SSTableReader sstable = Iterables.getFirst(getCurrentColumnFamilyStore().getLiveSSTables(), null);
+ MockActiveCompactions mockActiveCompactions = new MockActiveCompactions();
+ CompactionManager.instance.verifyOne(getCurrentColumnFamilyStore(), sstable, new Verifier.Options.Builder().build(), mockActiveCompactions);
+ assertTrue(mockActiveCompactions.finished);
+ assertEquals(mockActiveCompactions.holder.getCompactionInfo().getSSTables(), Sets.newHashSet(sstable));
+ assertFalse(mockActiveCompactions.holder.getCompactionInfo().shouldStop((s) -> false));
+ assertTrue(mockActiveCompactions.holder.getCompactionInfo().shouldStop((s) -> true));
+ }
+
+ @Test
+ public void testSubmitCacheWrite() throws ExecutionException, InterruptedException
+ {
+ AutoSavingCache.Writer writer = CacheService.instance.keyCache.getWriter(100);
+ MockActiveCompactions mockActiveCompactions = new MockActiveCompactions();
+ CompactionManager.instance.submitCacheWrite(writer, mockActiveCompactions).get();
+ assertTrue(mockActiveCompactions.finished);
+ assertTrue(mockActiveCompactions.holder.getCompactionInfo().getSSTables().isEmpty());
+ }
+
+ private static class MockActiveCompactions implements ActiveCompactionsTracker
+ {
+ public CompactionInfo.Holder holder;
+ public boolean finished = false;
+ public void beginCompaction(CompactionInfo.Holder ci)
+ {
+ holder = ci;
+ }
+
+ public void finishCompaction(CompactionInfo.Holder ci)
+ {
+ finished = true;
+ }
+ }
+}
diff --git a/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
new file mode 100644
index 0000000..4b05fc4
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.repair.PendingAntiCompaction;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.index.StubIndex;
+import org.apache.cassandra.index.internal.CollatedViewIndexBuilder;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.schema.MockSchema;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class CancelCompactionsTest
+{
+ @BeforeClass
+ public static void setup()
+ {
+ DatabaseDescriptor.daemonInitialization();
+ }
+
+ /**
+ * makes sure we only cancel compactions if the precidate says we have overlapping sstables
+ */
+ @Test
+ public void cancelTest() throws InterruptedException
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS();
+ List<SSTableReader> sstables = createSSTables(cfs, 10, 0);
+ Set<SSTableReader> toMarkCompacting = new HashSet<>(sstables.subList(0, 3));
+
+ TestCompactionTask tct = new TestCompactionTask(cfs, toMarkCompacting);
+ try
+ {
+ tct.start();
+
+ List<CompactionInfo.Holder> activeCompactions = CompactionManager.instance.active.getCompactions();
+ assertEquals(1, activeCompactions.size());
+ assertEquals(activeCompactions.get(0).getCompactionInfo().getSSTables(), toMarkCompacting);
+ // predicate requires the non-compacting sstables, should not cancel the one currently compacting:
+ cfs.runWithCompactionsDisabled(() -> null, (sstable) -> !toMarkCompacting.contains(sstable), false, false);
+ assertEquals(1, activeCompactions.size());
+ assertFalse(activeCompactions.get(0).isStopRequested());
+
+ // predicate requires the compacting ones - make sure stop is requested and that when we abort that
+ // compaction we actually run the callable (countdown the latch)
+ CountDownLatch cdl = new CountDownLatch(1);
+ Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { cdl.countDown(); return null; }, toMarkCompacting::contains, false, false));
+ t.start();
+ while (!activeCompactions.get(0).isStopRequested())
+ Thread.sleep(100);
+
+ // cdl.countDown will not get executed until we have aborted all compactions for the sstables in toMarkCompacting
+ assertFalse(cdl.await(2, TimeUnit.SECONDS));
+ tct.abort();
+ // now the compactions are aborted and we can successfully wait for the latch
+ t.join();
+ assertTrue(cdl.await(2, TimeUnit.SECONDS));
+ }
+ finally
+ {
+ tct.abort();
+ }
+ }
+
+ /**
+ * make sure we only cancel relevant compactions when there are multiple ongoing compactions
+ */
+ @Test
+ public void multipleCompactionsCancelTest() throws InterruptedException
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS();
+ List<SSTableReader> sstables = createSSTables(cfs, 10, 0);
+
+ List<TestCompactionTask> tcts = new ArrayList<>();
+ tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(0, 3))));
+ tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(6, 9))));
+
+ try
+ {
+ tcts.forEach(TestCompactionTask::start);
+
+ List<CompactionInfo.Holder> activeCompactions = CompactionManager.instance.active.getCompactions();
+ assertEquals(2, activeCompactions.size());
+
+ Set<Set<SSTableReader>> compactingSSTables = new HashSet<>();
+ compactingSSTables.add(activeCompactions.get(0).getCompactionInfo().getSSTables());
+ compactingSSTables.add(activeCompactions.get(1).getCompactionInfo().getSSTables());
+ Set<Set<SSTableReader>> expectedSSTables = new HashSet<>();
+ expectedSSTables.add(new HashSet<>(sstables.subList(0, 3)));
+ expectedSSTables.add(new HashSet<>(sstables.subList(6, 9)));
+ assertEquals(compactingSSTables, expectedSSTables);
+
+ cfs.runWithCompactionsDisabled(() -> null, (sstable) -> false, false, false);
+ assertEquals(2, activeCompactions.size());
+ assertTrue(activeCompactions.stream().noneMatch(CompactionInfo.Holder::isStopRequested));
+
+ CountDownLatch cdl = new CountDownLatch(1);
+ // start a compaction which only needs the sstables where first token is > 50 - these are the sstables compacted by tcts.get(1)
+ Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { cdl.countDown(); return null; }, (sstable) -> first(sstable) > 50, false, false));
+ t.start();
+ activeCompactions = CompactionManager.instance.active.getCompactions();
+ assertEquals(2, activeCompactions.size());
+ Thread.sleep(500);
+ for (CompactionInfo.Holder holder : activeCompactions)
+ {
+ if (holder.getCompactionInfo().getSSTables().containsAll(sstables.subList(6, 9)))
+ assertTrue(holder.isStopRequested());
+ else
+ assertFalse(holder.isStopRequested());
+ }
+ tcts.get(1).abort();
+ assertEquals(1, CompactionManager.instance.active.getCompactions().size());
+ cdl.await();
+ t.join();
+ }
+ finally
+ {
+ tcts.forEach(TestCompactionTask::abort);
+ }
+ }
+
+ /**
+ * Makes sure sub range compaction now only cancels the relevant compactions, not all of them
+ */
+ @Test
+ public void testSubrangeCompaction() throws InterruptedException
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS();
+ List<SSTableReader> sstables = createSSTables(cfs, 10, 0);
+
+ List<TestCompactionTask> tcts = new ArrayList<>();
+ tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(0, 2))));
+ tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(3, 4))));
+ tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(5, 7))));
+ tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(8, 9))));
+ try
+ {
+ tcts.forEach(TestCompactionTask::start);
+
+ List<CompactionInfo.Holder> activeCompactions = CompactionManager.instance.active.getCompactions();
+ assertEquals(4, activeCompactions.size());
+ Range<Token> range = new Range<>(token(0), token(49));
+ Thread t = new Thread(() -> {
+ try
+ {
+ cfs.forceCompactionForTokenRange(Collections.singleton(range));
+ }
+ catch (Throwable e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+
+ t.start();
+
+ Thread.sleep(500);
+ assertEquals(4, CompactionManager.instance.active.getCompactions().size());
+ List<TestCompactionTask> toAbort = new ArrayList<>();
+ for (CompactionInfo.Holder holder : CompactionManager.instance.active.getCompactions())
+ {
+ if (holder.getCompactionInfo().getSSTables().stream().anyMatch(sstable -> sstable.intersects(Collections.singleton(range))))
+ {
+ assertTrue(holder.isStopRequested());
+ for (TestCompactionTask tct : tcts)
+ if (tct.sstables.equals(holder.getCompactionInfo().getSSTables()))
+ toAbort.add(tct);
+ }
+ else
+ assertFalse(holder.isStopRequested());
+ }
+ assertEquals(2, toAbort.size());
+ toAbort.forEach(TestCompactionTask::abort);
+ t.join();
+
+ }
+ finally
+ {
+ tcts.forEach(TestCompactionTask::abort);
+ }
+ }
+
+ @Test
+ public void testAnticompaction() throws InterruptedException, ExecutionException
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS();
+ List<SSTableReader> sstables = createSSTables(cfs, 10, 0);
+ List<SSTableReader> alreadyRepairedSSTables = createSSTables(cfs, 10, 10);
+ for (SSTableReader sstable : alreadyRepairedSSTables)
+ AbstractPendingRepairTest.mutateRepaired(sstable, System.currentTimeMillis());
+ assertEquals(20, cfs.getLiveSSTables().size());
+ List<TestCompactionTask> tcts = new ArrayList<>();
+ tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(0, 2))));
+ tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(3, 4))));
+ tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(5, 7))));
+ tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(8, 9))));
+
+ List<TestCompactionTask> nonAffectedTcts = new ArrayList<>();
+ nonAffectedTcts.add(new TestCompactionTask(cfs, new HashSet<>(alreadyRepairedSSTables)));
+
+ try
+ {
+ tcts.forEach(TestCompactionTask::start);
+ nonAffectedTcts.forEach(TestCompactionTask::start);
+ List<CompactionInfo.Holder> activeCompactions = CompactionManager.instance.active.getCompactions();
+ assertEquals(5, activeCompactions.size());
+ // make sure that sstables are fully contained so that the metadata gets mutated
+ Range<Token> range = new Range<>(token(-1), token(49));
+
+ UUID prsid = UUID.randomUUID();
+ ActiveRepairService.instance.registerParentRepairSession(prsid, InetAddressAndPort.getLocalHost(), Collections.singletonList(cfs), Collections.singleton(range), true, 1, true, PreviewKind.NONE);
+
+ InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
+ RangesAtEndpoint rae = RangesAtEndpoint.builder(local).add(new Replica(local, range, true)).build();
+
+ PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs), rae, Executors.newSingleThreadExecutor());
+ Future<?> fut = pac.run();
+ Thread.sleep(600);
+ List<TestCompactionTask> toAbort = new ArrayList<>();
+ for (CompactionInfo.Holder holder : CompactionManager.instance.active.getCompactions())
+ {
+ if (holder.getCompactionInfo().getSSTables().stream().anyMatch(sstable -> sstable.intersects(Collections.singleton(range)) && !sstable.isRepaired() && !sstable.isPendingRepair()))
+ {
+ assertTrue(holder.isStopRequested());
+ for (TestCompactionTask tct : tcts)
+ if (tct.sstables.equals(holder.getCompactionInfo().getSSTables()))
+ toAbort.add(tct);
+ }
+ else
+ assertFalse(holder.isStopRequested());
+ }
+ assertEquals(2, toAbort.size());
+ toAbort.forEach(TestCompactionTask::abort);
+ fut.get();
+ for (SSTableReader sstable : sstables)
+ assertTrue(!sstable.intersects(Collections.singleton(range)) || sstable.isPendingRepair());
+ }
+ finally
+ {
+ tcts.forEach(TestCompactionTask::abort);
+ nonAffectedTcts.forEach(TestCompactionTask::abort);
+ }
+ }
+
+ /**
+ * Make sure index rebuilds get cancelled
+ */
+ @Test
+ public void testIndexRebuild() throws ExecutionException, InterruptedException
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS();
+ List<SSTableReader> sstables = createSSTables(cfs, 5, 0);
+ Index idx = new StubIndex(cfs, null);
+ CountDownLatch indexBuildStarted = new CountDownLatch(1);
+ CountDownLatch indexBuildRunning = new CountDownLatch(1);
+ CountDownLatch compactionsStopped = new CountDownLatch(1);
+ ReducingKeyIterator reducingKeyIterator = new ReducingKeyIterator(sstables)
+ {
+ @Override
+ public boolean hasNext()
+ {
+ indexBuildStarted.countDown();
+ try
+ {
+ indexBuildRunning.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException();
+ }
+ return false;
+ }
+ };
+ Future<?> f = CompactionManager.instance.submitIndexBuild(new CollatedViewIndexBuilder(cfs, Collections.singleton(idx), reducingKeyIterator, ImmutableSet.copyOf(sstables)));
+ // wait for hasNext to get called
+ indexBuildStarted.await();
+ assertEquals(1, CompactionManager.instance.active.getCompactions().size());
+ boolean foundCompaction = false;
+ for (CompactionInfo.Holder holder : CompactionManager.instance.active.getCompactions())
+ {
+ if (holder.getCompactionInfo().getSSTables().equals(new HashSet<>(sstables)))
+ {
+ assertFalse(holder.isStopRequested());
+ foundCompaction = true;
+ }
+ }
+ assertTrue(foundCompaction);
+ cfs.runWithCompactionsDisabled(() -> {compactionsStopped.countDown(); return null;}, (sstable) -> true, false, false);
+ // wait for the runWithCompactionsDisabled callable
+ compactionsStopped.await();
+ assertEquals(1, CompactionManager.instance.active.getCompactions().size());
+ foundCompaction = false;
+ for (CompactionInfo.Holder holder : CompactionManager.instance.active.getCompactions())
+ {
+ if (holder.getCompactionInfo().getSSTables().equals(new HashSet<>(sstables)))
+ {
+ assertTrue(holder.isStopRequested());
+ foundCompaction = true;
+ }
+ }
+ assertTrue(foundCompaction);
+ // signal that the index build should be finished
+ indexBuildRunning.countDown();
+ f.get();
+ assertTrue(CompactionManager.instance.active.getCompactions().isEmpty());
+ }
+
+ long first(SSTableReader sstable)
+ {
+ return (long)sstable.first.getToken().getTokenValue();
+ }
+
+ Token token(long t)
+ {
+ return new Murmur3Partitioner.LongToken(t);
+ }
+
+ private List<SSTableReader> createSSTables(ColumnFamilyStore cfs, int count, int startGeneration)
+ {
+ List<SSTableReader> sstables = new ArrayList<>();
+ for (int i = 0; i < count; i++)
+ {
+ long first = i * 10;
+ long last = (i + 1) * 10 - 1;
+ sstables.add(MockSchema.sstable(startGeneration + i, 0, true, first, last, cfs));
+ }
+ cfs.disableAutoCompaction();
+ cfs.addSSTables(sstables);
+ return sstables;
+ }
+
+ private static class TestCompactionTask
+ {
+ private ColumnFamilyStore cfs;
+ private final Set<SSTableReader> sstables;
+ private LifecycleTransaction txn;
+ private CompactionController controller;
+ private CompactionIterator ci;
+ private List<ISSTableScanner> scanners;
+
+ public TestCompactionTask(ColumnFamilyStore cfs, Set<SSTableReader> sstables)
+ {
+ this.cfs = cfs;
+ this.sstables = sstables;
+ }
+
+ public void start()
+ {
+ scanners = sstables.stream().map(SSTableReader::getScanner).collect(Collectors.toList());
+ txn = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+ assertNotNull(txn);
+ controller = new CompactionController(cfs, sstables, Integer.MIN_VALUE);
+ ci = new CompactionIterator(txn.opType(), scanners, controller, FBUtilities.nowInSeconds(), UUID.randomUUID());
+ CompactionManager.instance.active.beginCompaction(ci);
+ }
+
+ public void abort()
+ {
+ if (controller != null)
+ controller.close();
+ if (ci != null)
+ ci.close();
+ if (txn != null)
+ txn.abort();
+ if (scanners != null)
+ scanners.forEach(ISSTableScanner::close);
+ CompactionManager.instance.active.finishCompaction(ci);
+
+ }
+ }
+}
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
index 864ef3e..bb02dab 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.TableMetadata;
@@ -461,9 +462,9 @@ public class CompactionIteratorTest
}
@Override
- public String getBackingFiles()
+ public Set<SSTableReader> getBackingSSTables()
{
- return null;
+ return ImmutableSet.of();
}
}
}
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
index 267c2e4..9f2bc2e 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
@@ -252,7 +252,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass());
// run the compaction
- compactionTask.execute(null);
+ compactionTask.execute(ActiveCompactionsTracker.NOOP);
Assert.assertTrue(repairedContains(sstable));
Assert.assertFalse(unrepairedContains(sstable));
@@ -293,7 +293,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass());
// run the compaction
- compactionTask.execute(null);
+ compactionTask.execute(ActiveCompactionsTracker.NOOP);
Assert.assertFalse(repairedContains(sstable));
Assert.assertTrue(unrepairedContains(sstable));
@@ -330,7 +330,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass());
// run the compaction
- compactionTask.execute(null);
+ compactionTask.execute(ActiveCompactionsTracker.NOOP);
Assert.assertTrue(cfs.getLiveSSTables().isEmpty());
Assert.assertFalse(hasPendingStrategiesFor(repairID));
@@ -361,7 +361,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass());
// run the compaction
- compactionTask.execute(null);
+ compactionTask.execute(ActiveCompactionsTracker.NOOP);
Assert.assertFalse(cfs.getLiveSSTables().isEmpty());
Assert.assertFalse(hasPendingStrategiesFor(repairID));
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
index 5370f33..af74603 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
@@ -86,7 +86,7 @@ public class CompactionTaskTest
cfs.getCompactionStrategyManager().pause();
try
{
- task.execute(CompactionManager.instance.getMetrics());
+ task.execute(CompactionManager.instance.active);
Assert.fail("Expected CompactionInterruptedException");
}
catch (CompactionInterruptedException e)
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index 857fa32..b003721 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -415,7 +415,7 @@ public class CompactionsCQLTest extends CQLTester
AbstractCompactionTask act = lcs.getNextBackgroundTask(0);
// we should be compacting all 50 sstables:
assertEquals(50, act.transaction.originals().size());
- act.execute(null);
+ act.execute(ActiveCompactionsTracker.NOOP);
}
@Test
@@ -452,7 +452,7 @@ public class CompactionsCQLTest extends CQLTester
assertEquals(0, ((LeveledCompactionTask)act).getLevel());
assertTrue(act.transaction.originals().stream().allMatch(s -> s.getSSTableLevel() == 0));
txn.abort(); // unmark the l1 sstable compacting
- act.execute(null);
+ act.execute(ActiveCompactionsTracker.NOOP);
}
private void prepareWide() throws Throwable
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 4573485..dcd5270 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -305,7 +305,7 @@ public class CompactionsPurgeTest
cfs.forceBlockingFlush();
List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstablesIncomplete, Integer.MAX_VALUE);
assertEquals(1, tasks.size());
- tasks.get(0).execute(null);
+ tasks.get(0).execute(ActiveCompactionsTracker.NOOP);
// verify that minor compaction does GC when key is provably not
// present in a non-compacted sstable
@@ -356,7 +356,7 @@ public class CompactionsPurgeTest
// compact the sstables with the c1/c2 data and the c1 tombstone
List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstablesIncomplete, Integer.MAX_VALUE);
assertEquals(1, tasks.size());
- tasks.get(0).execute(null);
+ tasks.get(0).execute(ActiveCompactionsTracker.NOOP);
// We should have both the c1 and c2 tombstones still. Since the min timestamp in the c2 tombstone
// sstable is older than the c1 tombstone, it is invalid to throw out the c1 tombstone.
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index b1aaf8a..6c75e7b 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -340,7 +340,7 @@ public class LeveledCompactionStrategyTest
waitForLeveling(cfs);
cfs.disableAutoCompaction();
- while(CompactionManager.instance.isCompacting(Arrays.asList(cfs)))
+ while(CompactionManager.instance.isCompacting(Arrays.asList(cfs), (sstable) -> true))
Thread.sleep(100);
CompactionStrategyManager manager = cfs.getCompactionStrategyManager();
@@ -437,7 +437,7 @@ public class LeveledCompactionStrategyTest
Collection<Range<Token>> tokenRanges = new ArrayList<>(Arrays.asList(tokenRange));
cfs.forceCompactionForTokenRange(tokenRanges);
- while(CompactionManager.instance.isCompacting(Arrays.asList(cfs))) {
+ while(CompactionManager.instance.isCompacting(Arrays.asList(cfs), (sstable) -> true)) {
Thread.sleep(100);
}
@@ -450,7 +450,7 @@ public class LeveledCompactionStrategyTest
cfs.forceCompactionForTokenRange(tokenRanges2);
- while(CompactionManager.instance.isCompacting(Arrays.asList(cfs))) {
+ while(CompactionManager.instance.isCompacting(Arrays.asList(cfs), (sstable) -> true)) {
Thread.sleep(100);
}
diff --git a/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java b/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java
index 1292b7e..61cf302 100644
--- a/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java
@@ -100,7 +100,7 @@ public class SingleSSTableLCSTaskTest extends CQLTester
// now we have a bunch of data in L0, first compaction will be a normal one, containing all sstables:
LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepairedUnsafe().first();
AbstractCompactionTask act = lcs.getNextBackgroundTask(0);
- act.execute(null);
+ act.execute(ActiveCompactionsTracker.NOOP);
// now all sstables are laid out non-overlapping in L1, this means that the rest of the compactions
// will be single sstable ones, make sure that we use SingleSSTableLCSTask if singleSSTUplevel is true:
@@ -108,7 +108,7 @@ public class SingleSSTableLCSTaskTest extends CQLTester
{
act = lcs.getNextBackgroundTask(0);
assertEquals(singleSSTUplevel, act instanceof SingleSSTableLCSTask);
- act.execute(null);
+ act.execute(ActiveCompactionsTracker.NOOP);
}
assertEquals(0, lcs.getLevelSize(0));
int l1size = lcs.getLevelSize(1);
diff --git a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
index 12a429b..385deac 100644
--- a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
@@ -50,13 +50,16 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.AbstractPendingRepairTest;
import org.apache.cassandra.db.compaction.CompactionController;
+import org.apache.cassandra.db.compaction.CompactionInfo;
import org.apache.cassandra.db.compaction.CompactionInterruptedException;
import org.apache.cassandra.db.compaction.CompactionIterator;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.ISSTableScanner;
@@ -65,6 +68,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
+import org.apache.cassandra.schema.MockSchema;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.ActiveRepairService;
@@ -74,6 +78,11 @@ import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.utils.concurrent.Transactional;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
{
static final Logger logger = LoggerFactory.getLogger(PendingAntiCompactionTest.class);
@@ -115,7 +124,7 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), i, i);
}
cfs.forceBlockingFlush();
- Assert.assertEquals(2, cfs.getLiveSSTables().size());
+ assertEquals(2, cfs.getLiveSSTables().size());
Token left = ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes((int) 6));
Token right = ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes((int) 16));
@@ -138,14 +147,14 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
executor.shutdown();
}
- Assert.assertEquals(3, cfs.getLiveSSTables().size());
+ assertEquals(3, cfs.getLiveSSTables().size());
int pendingRepair = 0;
for (SSTableReader sstable : cfs.getLiveSSTables())
{
if (sstable.isPendingRepair())
pendingRepair++;
}
- Assert.assertEquals(2, pendingRepair);
+ assertEquals(2, pendingRepair);
}
@Test
@@ -168,14 +177,14 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
Assert.assertNotNull(result);
logger.info("Originals: {}", result.txn.originals());
- Assert.assertEquals(3, result.txn.originals().size());
+ assertEquals(3, result.txn.originals().size());
for (SSTableReader sstable : expected)
{
logger.info("Checking {}", sstable);
- Assert.assertTrue(result.txn.originals().contains(sstable));
+ assertTrue(result.txn.originals().contains(sstable));
}
- Assert.assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, result.txn.state());
+ assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, result.txn.state());
result.abort();
}
@@ -186,11 +195,11 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
makeSSTables(2);
List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
- Assert.assertEquals(2, sstables.size());
+ assertEquals(2, sstables.size());
SSTableReader repaired = sstables.get(0);
SSTableReader unrepaired = sstables.get(1);
- Assert.assertTrue(repaired.intersects(FULL_RANGE));
- Assert.assertTrue(unrepaired.intersects(FULL_RANGE));
+ assertTrue(repaired.intersects(FULL_RANGE));
+ assertTrue(unrepaired.intersects(FULL_RANGE));
repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor, 1, null, false);
repaired.reloadSSTableMetadata();
@@ -200,8 +209,8 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
Assert.assertNotNull(result);
logger.info("Originals: {}", result.txn.originals());
- Assert.assertEquals(1, result.txn.originals().size());
- Assert.assertTrue(result.txn.originals().contains(unrepaired));
+ assertEquals(1, result.txn.originals().size());
+ assertTrue(result.txn.originals().contains(unrepaired));
result.abort(); // release sstable refs
}
@@ -212,25 +221,25 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
makeSSTables(2);
List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
- Assert.assertEquals(2, sstables.size());
+ assertEquals(2, sstables.size());
SSTableReader repaired = sstables.get(0);
SSTableReader unrepaired = sstables.get(1);
- Assert.assertTrue(repaired.intersects(FULL_RANGE));
- Assert.assertTrue(unrepaired.intersects(FULL_RANGE));
+ assertTrue(repaired.intersects(FULL_RANGE));
+ assertTrue(unrepaired.intersects(FULL_RANGE));
UUID sessionId = prepareSession();
LocalSessionAccessor.finalizeUnsafe(sessionId);
repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor, 0, sessionId, false);
repaired.reloadSSTableMetadata();
- Assert.assertTrue(repaired.isPendingRepair());
+ assertTrue(repaired.isPendingRepair());
PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
Assert.assertNotNull(result);
logger.info("Originals: {}", result.txn.originals());
- Assert.assertEquals(1, result.txn.originals().size());
- Assert.assertTrue(result.txn.originals().contains(unrepaired));
+ assertEquals(1, result.txn.originals().size());
+ assertTrue(result.txn.originals().contains(unrepaired));
result.abort(); // releases sstable refs
}
@@ -241,16 +250,16 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
makeSSTables(2);
List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
- Assert.assertEquals(2, sstables.size());
+ assertEquals(2, sstables.size());
SSTableReader repaired = sstables.get(0);
SSTableReader unrepaired = sstables.get(1);
- Assert.assertTrue(repaired.intersects(FULL_RANGE));
- Assert.assertTrue(unrepaired.intersects(FULL_RANGE));
+ assertTrue(repaired.intersects(FULL_RANGE));
+ assertTrue(unrepaired.intersects(FULL_RANGE));
UUID sessionId = prepareSession();
repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor, 0, sessionId, false);
repaired.reloadSSTableMetadata();
- Assert.assertTrue(repaired.isPendingRepair());
+ assertTrue(repaired.isPendingRepair());
PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
@@ -262,7 +271,7 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
{
cfs.disableAutoCompaction();
- Assert.assertEquals(0, cfs.getLiveSSTables().size());
+ assertEquals(0, cfs.getLiveSSTables().size());
PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
@@ -285,11 +294,11 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
Assert.assertNotNull(result);
InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, NO_RANGES));
- Assert.assertTrue(cb.submittedCompactions.isEmpty());
+ assertTrue(cb.submittedCompactions.isEmpty());
cb.apply(Lists.newArrayList(result));
- Assert.assertEquals(1, cb.submittedCompactions.size());
- Assert.assertTrue(cb.submittedCompactions.contains(cfm.id));
+ assertEquals(1, cb.submittedCompactions.size());
+ assertTrue(cb.submittedCompactions.contains(cfm.id));
}
/**
@@ -306,14 +315,14 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
Assert.assertNotNull(result);
- Assert.assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, result.txn.state());
+ assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, result.txn.state());
InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, Collections.emptyList()));
- Assert.assertTrue(cb.submittedCompactions.isEmpty());
+ assertTrue(cb.submittedCompactions.isEmpty());
cb.apply(Lists.newArrayList(result, null));
- Assert.assertTrue(cb.submittedCompactions.isEmpty());
- Assert.assertEquals(Transactional.AbstractTransactional.State.ABORTED, result.txn.state());
+ assertTrue(cb.submittedCompactions.isEmpty());
+ assertEquals(Transactional.AbstractTransactional.State.ABORTED, result.txn.state());
}
/**
@@ -334,12 +343,12 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
PendingAntiCompaction.AcquireResult fakeResult = new PendingAntiCompaction.AcquireResult(cfs2, null, null);
InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, NO_RANGES));
- Assert.assertTrue(cb.submittedCompactions.isEmpty());
+ assertTrue(cb.submittedCompactions.isEmpty());
cb.apply(Lists.newArrayList(result, fakeResult));
- Assert.assertEquals(1, cb.submittedCompactions.size());
- Assert.assertTrue(cb.submittedCompactions.contains(cfm.id));
- Assert.assertFalse(cb.submittedCompactions.contains(cfs2.metadata.id));
+ assertEquals(1, cb.submittedCompactions.size());
+ assertTrue(cb.submittedCompactions.contains(cfm.id));
+ assertFalse(cb.submittedCompactions.contains(cfs2.metadata.id));
}
@@ -398,7 +407,7 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
try
{
fut.get();
- Assert.fail("Should throw exception");
+ fail("Should throw exception");
}
catch(Throwable t)
{
@@ -406,7 +415,7 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
}
@Test
- public void testBlockedAcquisition() throws ExecutionException, InterruptedException
+ public void testBlockedAcquisition() throws ExecutionException, InterruptedException, TimeoutException
{
cfs.disableAutoCompaction();
ExecutorService es = Executors.newFixedThreadPool(1);
@@ -419,33 +428,27 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
{
try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
CompactionController controller = new CompactionController(cfs, sstables, 0);
- CompactionIterator ci = CompactionManager.getAntiCompactionIterator(scanners, controller, 0, UUID.randomUUID(), CompactionManager.instance.getMetrics()))
+ CompactionIterator ci = CompactionManager.getAntiCompactionIterator(scanners, controller, 0, UUID.randomUUID(), CompactionManager.instance.active))
{
// `ci` is our imaginary ongoing anticompaction which makes no progress until after 30s
// now we try to start a new AC, which will try to cancel all ongoing compactions
- CompactionManager.instance.getMetrics().beginCompaction(ci);
+ CompactionManager.instance.active.beginCompaction(ci);
PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), es);
ListenableFuture fut = pac.run();
try
{
fut.get(30, TimeUnit.SECONDS);
+ fail("the future should throw exception since we try to start a new anticompaction when one is already running");
}
- catch (TimeoutException e)
- {
- // expected, we wait 1 minute for compactions to get cancelled in runWithCompactionsDisabled
- }
- Assert.assertTrue(ci.hasNext());
- ci.next(); // this would throw exception if the CompactionIterator was abortable
- try
+ catch (ExecutionException e)
{
- fut.get();
- Assert.fail("We should get exception when trying to start a new anticompaction with the same sstables");
+ assertTrue(e.getCause() instanceof PendingAntiCompaction.SSTableAcquisitionException);
}
- catch (Throwable t)
- {
- }
+ assertEquals(1, getCompactionsFor(cfs).size());
+ for (CompactionInfo.Holder holder : getCompactionsFor(cfs))
+ assertFalse(holder.isStopRequested());
}
}
finally
@@ -455,6 +458,17 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
}
}
+ private List<CompactionInfo.Holder> getCompactionsFor(ColumnFamilyStore cfs)
+ {
+ List<CompactionInfo.Holder> compactions = new ArrayList<>();
+ for (CompactionInfo.Holder holder : CompactionManager.instance.active.getCompactions())
+ {
+ if (holder.getCompactionInfo().getTableMetadata().equals(cfs.metadata()))
+ compactions.add(holder);
+ }
+ return compactions;
+ }
+
@Test
public void testUnblockedAcquisition() throws ExecutionException, InterruptedException
{
@@ -473,7 +487,7 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
// `ci` is our imaginary ongoing anticompaction which makes no progress until after 5s
// now we try to start a new AC, which will try to cancel all ongoing compactions
- CompactionManager.instance.getMetrics().beginCompaction(ci);
+ CompactionManager.instance.active.beginCompaction(ci);
PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), es);
ListenableFuture fut = pac.run();
try
@@ -487,13 +501,13 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
}
try
{
- Assert.assertTrue(ci.hasNext());
+ assertTrue(ci.hasNext());
ci.next();
- Assert.fail("CompactionIterator should be abortable");
+ fail("CompactionIterator should be abortable");
}
catch (CompactionInterruptedException e)
{
- CompactionManager.instance.getMetrics().finishCompaction(ci);
+ CompactionManager.instance.active.finishCompaction(ci);
txn.abort();
// expected
}
@@ -509,7 +523,7 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
{
}
});
- Assert.assertTrue(cdl.await(1, TimeUnit.MINUTES));
+ assertTrue(cdl.await(1, TimeUnit.MINUTES));
}
}
finally
@@ -518,6 +532,73 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
}
}
+ @Test
+ public void testSSTablePredicateOngoingAntiCompaction()
+ {
+ ColumnFamilyStore cfs = MockSchema.newCFS();
+ cfs.disableAutoCompaction();
+ List<SSTableReader> sstables = new ArrayList<>();
+ List<SSTableReader> repairedSSTables = new ArrayList<>();
+ List<SSTableReader> pendingSSTables = new ArrayList<>();
+ for (int i = 1; i <= 10; i++)
+ {
+ SSTableReader sstable = MockSchema.sstable(i, i * 10, i * 10 + 9, cfs);
+ sstables.add(sstable);
+ }
+ for (int i = 1; i <= 10; i++)
+ {
+ SSTableReader sstable = MockSchema.sstable(i + 10, i * 10, i * 10 + 9, cfs);
+ AbstractPendingRepairTest.mutateRepaired(sstable, System.currentTimeMillis());
+ repairedSSTables.add(sstable);
+ }
+ for (int i = 1; i <= 10; i++)
+ {
+ SSTableReader sstable = MockSchema.sstable(i + 20, i * 10, i * 10 + 9, cfs);
+ AbstractPendingRepairTest.mutateRepaired(sstable, UUID.randomUUID(), false);
+ pendingSSTables.add(sstable);
+ }
+
+ cfs.addSSTables(sstables);
+ cfs.addSSTables(repairedSSTables);
+
+ // if we are compacting the non-repaired non-pending sstables, we should get an error
+ tryPredicate(cfs, sstables, null, true);
+ // make sure we don't try to grab pending or repaired sstables;
+ tryPredicate(cfs, repairedSSTables, sstables, false);
+ tryPredicate(cfs, pendingSSTables, sstables, false);
+ }
+
+ private void tryPredicate(ColumnFamilyStore cfs, List<SSTableReader> compacting, List<SSTableReader> expectedLive, boolean shouldFail)
+ {
+ CompactionInfo.Holder holder = new CompactionInfo.Holder()
+ {
+ public CompactionInfo getCompactionInfo()
+ {
+ return new CompactionInfo(cfs.metadata(), OperationType.ANTICOMPACTION, 0, 1000, UUID.randomUUID(), compacting);
+ }
+ };
+ CompactionManager.instance.active.beginCompaction(holder);
+ try
+ {
+ PendingAntiCompaction.AntiCompactionPredicate predicate =
+ new PendingAntiCompaction.AntiCompactionPredicate(Collections.singleton(new Range<>(new Murmur3Partitioner.LongToken(0), new Murmur3Partitioner.LongToken(100))),
+ UUID.randomUUID());
+ Set<SSTableReader> live = cfs.getLiveSSTables().stream().filter(predicate).collect(Collectors.toSet());
+ if (shouldFail)
+ fail("should fail - we try to grab already anticompacting sstables for anticompaction");
+ assertEquals(live, new HashSet<>(expectedLive));
+ }
+ catch (PendingAntiCompaction.SSTableAcquisitionException e)
+ {
+ if (!shouldFail)
+ fail("We should not fail filtering sstables");
+ }
+ finally
+ {
+ CompactionManager.instance.active.finishCompaction(holder);
+ }
+ }
+
private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, Collection<Range<Token>> trans)
{
RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(local);
diff --git a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
index 9a806c2..1c07760 100644
--- a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
+++ b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
@@ -28,6 +28,8 @@ import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
+import com.google.common.collect.ImmutableSet;
+
import org.apache.cassandra.index.TargetParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -593,8 +595,8 @@ public class CustomCassandraIndex implements Index
{
// interrupt in-progress compactions
Collection<ColumnFamilyStore> cfss = Collections.singleton(indexCfs);
- CompactionManager.instance.interruptCompactionForCFs(cfss, true);
- CompactionManager.instance.waitForCessation(cfss);
+ CompactionManager.instance.interruptCompactionForCFs(cfss, (sstable) -> true, true);
+ CompactionManager.instance.waitForCessation(cfss, (sstable) -> true);
indexCfs.keyspace.writeOrder.awaitNewBarrier();
indexCfs.forceBlockingFlush();
indexCfs.readOrdering.awaitNewBarrier();
@@ -641,7 +643,8 @@ public class CustomCassandraIndex implements Index
SecondaryIndexBuilder builder = new CollatedViewIndexBuilder(baseCfs,
Collections.singleton(this),
- new ReducingKeyIterator(sstables));
+ new ReducingKeyIterator(sstables),
+ ImmutableSet.copyOf(sstables));
Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
FBUtilities.waitOnFuture(future);
indexCfs.forceBlockingFlush();
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 186f0e8..ab8a710 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -111,7 +111,7 @@ public class IndexSummaryManagerTest
@After
public void afterTest()
{
- for (CompactionInfo.Holder holder : CompactionMetrics.getCompactions())
+ for (CompactionInfo.Holder holder : CompactionManager.instance.active.getCompactions())
{
holder.stop();
}
@@ -659,7 +659,7 @@ public class IndexSummaryManagerTest
t.join();
assertNotNull("Expected compaction interrupted exception", exception.get());
- assertTrue("Expected no active compactions", CompactionMetrics.getCompactions().isEmpty());
+ assertTrue("Expected no active compactions", CompactionManager.instance.active.getCompactions().isEmpty());
Set<SSTableReader> beforeRedistributionSSTables = new HashSet<>(sstables);
Set<SSTableReader> afterCancelSSTables = new HashSet<>(cfs.getLiveSSTables());
diff --git a/test/unit/org/apache/cassandra/schema/MockSchema.java b/test/unit/org/apache/cassandra/schema/MockSchema.java
index 7a6b011..9ca2d6e 100644
--- a/test/unit/org/apache/cassandra/schema/MockSchema.java
+++ b/test/unit/org/apache/cassandra/schema/MockSchema.java
@@ -49,6 +49,8 @@ import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.utils.AlwaysPresentFilter;
import org.apache.cassandra.utils.ByteBufferUtil;
+import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
+
public class MockSchema
{
static
@@ -127,7 +129,7 @@ public class MockSchema
}
SerializationHeader header = SerializationHeader.make(cfs.metadata(), Collections.emptyList());
StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata().comparator)
- .finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 0.01f, -1, null, false, header)
+ .finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 0.01f, UNREPAIRED_SSTABLE, null, false, header)
.get(MetadataType.STATS);
SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata,
RANDOM_ACCESS_READER_FACTORY.sharedCopy(), RANDOM_ACCESS_READER_FACTORY.sharedCopy(), indexSummary.sharedCopy(),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org