You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2019/01/17 07:02:11 UTC

[cassandra] branch trunk updated: Only cancel conflicting compactions when starting anticompactions and sub range compactions

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6e27e65  Only cancel conflicting compactions when starting anticompactions and sub range compactions
6e27e65 is described below

commit 6e27e65f150ed6cc6a7205b9d7c89a763f3256fa
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Tue Dec 18 17:01:02 2018 +0100

    Only cancel conflicting compactions when starting anticompactions and sub range compactions
    
    Patch by marcuse; reviewed by Blake Eggleston for CASSANDRA-14935
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/cache/AutoSavingCache.java    |  12 +-
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  13 +-
 src/java/org/apache/cassandra/db/Keyspace.java     |   2 +-
 .../db/compaction/AbstractCompactionTask.java      |   7 +-
 .../cassandra/db/compaction/ActiveCompactions.java |  66 ++++
 .../db/compaction/ActiveCompactionsTracker.java    |  34 ++
 .../cassandra/db/compaction/CompactionInfo.java    | 108 ++++--
 .../db/compaction/CompactionIterator.java          |  27 +-
 .../cassandra/db/compaction/CompactionManager.java | 149 +++++---
 .../cassandra/db/compaction/CompactionTask.java    |  15 +-
 .../db/compaction/LeveledCompactionStrategy.java   |   4 +-
 .../db/compaction/PendingRepairManager.java        |   2 +-
 .../cassandra/db/compaction/SSTableSplitter.java   |  17 +-
 .../apache/cassandra/db/compaction/Scrubber.java   |   4 +-
 .../db/compaction/SingleSSTableLCSTask.java        |   2 +-
 .../apache/cassandra/db/compaction/Verifier.java   |   4 +-
 .../db/repair/CassandraValidationIterator.java     |   8 +-
 .../cassandra/db/repair/PendingAntiCompaction.java | 150 ++++----
 .../apache/cassandra/db/view/ViewBuilderTask.java  |  12 +-
 src/java/org/apache/cassandra/index/Index.java     |   2 +-
 .../cassandra/index/internal/CassandraIndex.java   |   7 +-
 .../index/internal/CollatedViewIndexBuilder.java   |   9 +-
 .../cassandra/index/sasi/SASIIndexBuilder.java     |   3 +-
 .../cassandra/io/sstable/ISSTableScanner.java      |   4 +-
 .../io/sstable/IndexSummaryRedistribution.java     |   2 +-
 .../io/sstable/format/big/BigTableScanner.java     |  10 +-
 .../cassandra/metrics/CompactionMetrics.java       |  28 +-
 src/java/org/apache/cassandra/schema/Schema.java   |   2 +-
 .../db/compaction/LongCompactionsTest.java         |   2 +-
 .../LongLeveledCompactionStrategyTest.java         |   2 +-
 test/unit/org/apache/cassandra/Util.java           |   3 +-
 .../db/compaction/AbstractPendingRepairTest.java   |   6 +-
 .../db/compaction/ActiveCompactionsTest.java       | 194 ++++++++++
 .../db/compaction/CancelCompactionsTest.java       | 421 +++++++++++++++++++++
 .../db/compaction/CompactionIteratorTest.java      |   5 +-
 ...CompactionStrategyManagerPendingRepairTest.java |   8 +-
 .../db/compaction/CompactionTaskTest.java          |   2 +-
 .../db/compaction/CompactionsCQLTest.java          |   4 +-
 .../db/compaction/CompactionsPurgeTest.java        |   4 +-
 .../compaction/LeveledCompactionStrategyTest.java  |   6 +-
 .../db/compaction/SingleSSTableLCSTaskTest.java    |   4 +-
 .../db/repair/PendingAntiCompactionTest.java       | 189 ++++++---
 .../index/internal/CustomCassandraIndex.java       |   9 +-
 .../io/sstable/IndexSummaryManagerTest.java        |   4 +-
 .../org/apache/cassandra/schema/MockSchema.java    |   4 +-
 46 files changed, 1222 insertions(+), 349 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 0e1da11..f44ac72 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Only cancel conflicting compactions when starting anticompactions and sub range compactions (CASSANDRA-14935)
  * Use a stub IndexRegistry for non-daemon use cases (CASSANDRA-14938)
  * Don't enable client transports when bootstrap is pending (CASSANDRA-14525)
  * Make antiCompactGroup throw exception on error and anticompaction non cancellable
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index d30b292..d9827e1 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -316,12 +316,12 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             else
                 type = OperationType.UNKNOWN;
 
-            info = new CompactionInfo(TableMetadata.minimal(SchemaConstants.SYSTEM_KEYSPACE_NAME, cacheType.toString()),
-                                      type,
-                                      0,
-                                      keysEstimate,
-                                      Unit.KEYS,
-                                      UUIDGen.getTimeUUID());
+            info = CompactionInfo.withoutSSTables(TableMetadata.minimal(SchemaConstants.SYSTEM_KEYSPACE_NAME, cacheType.toString()),
+                                                  type,
+                                                  0,
+                                                  keysEstimate,
+                                                  Unit.KEYS,
+                                                  UUIDGen.getTimeUUID());
         }
 
         public CacheService.CacheType cacheType()
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index cc6af6f..adac934 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2185,6 +2185,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation, boolean interruptViews)
     {
+        return runWithCompactionsDisabled(callable, (sstable) -> true, interruptValidation, interruptViews);
+    }
+
+    public <V> V runWithCompactionsDisabled(Callable<V> callable, Predicate<SSTableReader> sstablesPredicate, boolean interruptValidation, boolean interruptViews)
+    {
         // synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly,
         // and so we only run one major compaction at a time
         synchronized (this)
@@ -2196,17 +2201,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                                                                : concatWithIndexes();
 
             for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
-                cfs.getCompactionStrategyManager().pause();
+                cfs.getCompactionStrategyManager().pause(); // todo: make sure anticompaction pauses!
             try
             {
                 // interrupt in-progress compactions
-                CompactionManager.instance.interruptCompactionForCFs(selfWithAuxiliaryCfs, interruptValidation);
-                CompactionManager.instance.waitForCessation(selfWithAuxiliaryCfs);
+                CompactionManager.instance.interruptCompactionForCFs(selfWithAuxiliaryCfs, sstablesPredicate, interruptValidation);
+                CompactionManager.instance.waitForCessation(selfWithAuxiliaryCfs, sstablesPredicate);
 
                 // doublecheck that we finished, instead of timing out
                 for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs)
                 {
-                    if (!cfs.getTracker().getCompacting().isEmpty())
+                    if (cfs.getTracker().getCompacting().stream().anyMatch(sstablesPredicate))
                     {
                         logger.warn("Unable to cancel in-progress compactions for {}.  Perhaps there is an unusually large row in progress somewhere, or the system is simply overloaded.", metadata.name);
                         return null;
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 33c0f32..bc382ee 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -384,7 +384,7 @@ public class Keyspace
             return;
 
         cfs.getCompactionStrategyManager().shutdown();
-        CompactionManager.instance.interruptCompactionForCFs(cfs.concatWithIndexes(), true);
+        CompactionManager.instance.interruptCompactionForCFs(cfs.concatWithIndexes(), (sstable) -> true, true);
         // wait for any outstanding reads/writes that might affect the CFS
         cfs.keyspace.writeOrder.awaitNewBarrier();
         cfs.readOrdering.awaitNewBarrier();
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
index c542a51..989c21c 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
@@ -25,7 +25,6 @@ import com.google.common.base.Preconditions;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
 import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
 import org.apache.cassandra.io.FSDiskFullWriteError;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -94,11 +93,11 @@ public abstract class AbstractCompactionTask extends WrappedRunnable
     /**
      * executes the task and unmarks sstables compacting
      */
-    public int execute(CompactionExecutorStatsCollector collector)
+    public int execute(ActiveCompactionsTracker activeCompactions)
     {
         try
         {
-            return executeInternal(collector);
+            return executeInternal(activeCompactions);
         }
         catch(FSDiskFullWriteError e)
         {
@@ -113,7 +112,7 @@ public abstract class AbstractCompactionTask extends WrappedRunnable
     }
     public abstract CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables);
 
-    protected abstract int executeInternal(CompactionExecutorStatsCollector collector);
+    protected abstract int executeInternal(ActiveCompactionsTracker activeCompactions);
 
     public AbstractCompactionTask setUserDefined(boolean isUserDefined)
     {
diff --git a/src/java/org/apache/cassandra/db/compaction/ActiveCompactions.java b/src/java/org/apache/cassandra/db/compaction/ActiveCompactions.java
new file mode 100644
index 0000000..b289ef9
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/ActiveCompactions.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
+public class ActiveCompactions implements ActiveCompactionsTracker
+{
+    // a synchronized identity set of running tasks to their compaction info
+    private final Set<CompactionInfo.Holder> compactions = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>()));
+
+    public List<CompactionInfo.Holder> getCompactions()
+    {
+        return new ArrayList<>(compactions);
+    }
+
+    public void beginCompaction(CompactionInfo.Holder ci)
+    {
+        compactions.add(ci);
+    }
+
+    public void finishCompaction(CompactionInfo.Holder ci)
+    {
+        compactions.remove(ci);
+        CompactionManager.instance.getMetrics().bytesCompacted.inc(ci.getCompactionInfo().getTotal());
+        CompactionManager.instance.getMetrics().totalCompactionsCompleted.mark();
+    }
+
+    public CompactionInfo getCompactionForSSTable(SSTableReader sstable)
+    {
+        CompactionInfo toReturn = null;
+        for (CompactionInfo.Holder holder : compactions)
+        {
+            // todo: change compactions datastructure to avoid iterating all active compactions
+            if (holder.getCompactionInfo().getSSTables().contains(sstable))
+            {
+                if (toReturn != null)
+                    throw new IllegalStateException("SSTable "+sstable+" involved in several compactions");
+                toReturn = holder.getCompactionInfo();
+            }
+        }
+        return toReturn;
+    }
+}
diff --git a/src/java/org/apache/cassandra/db/compaction/ActiveCompactionsTracker.java b/src/java/org/apache/cassandra/db/compaction/ActiveCompactionsTracker.java
new file mode 100644
index 0000000..c1bbbd8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/ActiveCompactionsTracker.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+public interface ActiveCompactionsTracker
+{
+    public void beginCompaction(CompactionInfo.Holder ci);
+    public void finishCompaction(CompactionInfo.Holder ci);
+
+    public static final ActiveCompactionsTracker NOOP = new ActiveCompactionsTracker()
+    {
+        public void beginCompaction(CompactionInfo.Holder ci)
+        {}
+
+        public void finishCompaction(CompactionInfo.Holder ci)
+        {}
+    };
+}
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index 99df259..7c950c0 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@ -17,19 +17,22 @@
  */
 package org.apache.cassandra.db.compaction;
 
-import java.io.Serializable;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.UUID;
+import java.util.function.Predicate;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.TableMetadata;
 
-/** Implements serializable to allow structured info to be returned via JMX. */
-public final class CompactionInfo implements Serializable
+public final class CompactionInfo
 {
-    private static final long serialVersionUID = 3695381572726744816L;
-
     public static final String ID = "id";
     public static final String KEYSPACE = "keyspace";
     public static final String COLUMNFAMILY = "columnfamily";
@@ -38,6 +41,7 @@ public final class CompactionInfo implements Serializable
     public static final String TASK_TYPE = "taskType";
     public static final String UNIT = "unit";
     public static final String COMPACTION_ID = "compactionId";
+    public static final String SSTABLES = "sstables";
 
     private final TableMetadata metadata;
     private final OperationType tasktype;
@@ -45,41 +49,19 @@ public final class CompactionInfo implements Serializable
     private final long total;
     private final Unit unit;
     private final UUID compactionId;
+    private final ImmutableSet<SSTableReader> sstables;
 
-    public CompactionInfo(TableMetadata metadata, OperationType tasktype, long bytesComplete, long totalBytes, UUID compactionId)
+    public CompactionInfo(TableMetadata metadata, OperationType tasktype, long bytesComplete, long totalBytes, UUID compactionId, Collection<SSTableReader> sstables)
     {
-        this(metadata, tasktype, bytesComplete, totalBytes, Unit.BYTES, compactionId);
+        this(metadata, tasktype, bytesComplete, totalBytes, Unit.BYTES, compactionId, sstables);
     }
 
-    public static enum Unit
+    public CompactionInfo(OperationType tasktype, long completed, long total, Unit unit, UUID compactionId, Collection<SSTableReader> sstables)
     {
-        BYTES("bytes"), RANGES("token range parts"), KEYS("keys");
-
-        private final String name;
-
-        private Unit(String name)
-        {
-            this.name = name;
-        }
-
-        @Override
-        public String toString()
-        {
-            return this.name;
-        }
-
-        public static boolean isFileSize(String unit)
-        {
-            return BYTES.toString().equals(unit);
-        }
-    }
-
-    public CompactionInfo(OperationType tasktype, long completed, long total, Unit unit, UUID compactionId)
-    {
-        this(null, tasktype, completed, total, unit, compactionId);
+        this(null, tasktype, completed, total, unit, compactionId, sstables);
     }
 
-    public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, Unit unit, UUID compactionId)
+    private CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, Unit unit, UUID compactionId, Collection<SSTableReader> sstables)
     {
         this.tasktype = tasktype;
         this.completed = completed;
@@ -87,12 +69,22 @@ public final class CompactionInfo implements Serializable
         this.metadata = metadata;
         this.unit = unit;
         this.compactionId = compactionId;
+        this.sstables = ImmutableSet.copyOf(sstables);
+    }
+
+    /**
+     * Special compaction info where we always need to cancel the compaction - for example ViewBuilderTask and AutoSavingCache where we don't know
+     * the sstables at construction
+     */
+    public static CompactionInfo withoutSSTables(TableMetadata metadata, OperationType tasktype, long completed, long total, Unit unit, UUID compactionId)
+    {
+        return new CompactionInfo(metadata, tasktype, completed, total, unit, compactionId, ImmutableSet.of());
     }
 
     /** @return A copy of this CompactionInfo with updated progress. */
     public CompactionInfo forProgress(long complete, long total)
     {
-        return new CompactionInfo(metadata, tasktype, complete, total, unit, compactionId);
+        return new CompactionInfo(metadata, tasktype, complete, total, unit, compactionId, sstables);
     }
 
     public Optional<String> getKeyspace()
@@ -135,10 +127,16 @@ public final class CompactionInfo implements Serializable
         return unit;
     }
 
+    public Set<SSTableReader> getSSTables()
+    {
+        return sstables;
+    }
+
     public String toString()
     {
         StringBuilder buff = new StringBuilder();
         buff.append(getTaskType());
+
         if (metadata != null)
         {
             buff.append('@').append(metadata.id).append('(');
@@ -148,8 +146,13 @@ public final class CompactionInfo implements Serializable
         {
             buff.append('(');
         }
-        buff.append(getCompleted()).append('/').append(getTotal());
-        return buff.append(')').append(unit).toString();
+        buff.append(getCompleted())
+            .append('/')
+            .append(getTotal())
+            .append(')')
+            .append(unit);
+
+        return buff.toString();
     }
 
     public Map<String, String> asMap()
@@ -163,9 +166,19 @@ public final class CompactionInfo implements Serializable
         ret.put(TASK_TYPE, tasktype.toString());
         ret.put(UNIT, unit.toString());
         ret.put(COMPACTION_ID, compactionId == null ? "" : compactionId.toString());
+        ret.put(SSTABLES, Joiner.on(',').join(sstables));
         return ret;
     }
 
+    boolean shouldStop(Predicate<SSTableReader> sstablePredicate)
+    {
+        if (sstables.isEmpty())
+        {
+            return true;
+        }
+        return sstables.stream().anyMatch(sstablePredicate);
+    }
+
     public static abstract class Holder
     {
         private volatile boolean stopRequested = false;
@@ -181,4 +194,27 @@ public final class CompactionInfo implements Serializable
             return stopRequested;
         }
     }
+
+    public enum Unit
+    {
+        BYTES("bytes"), RANGES("token range parts"), KEYS("keys");
+
+        private final String name;
+
+        Unit(String name)
+        {
+            this.name = name;
+        }
+
+        @Override
+        public String toString()
+        {
+            return this.name;
+        }
+
+        public static boolean isFileSize(String unit)
+        {
+            return BYTES.toString().equals(unit);
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index c73520d..0aba594 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -20,8 +20,10 @@ package org.apache.cassandra.db.compaction;
 import java.util.*;
 import java.util.function.LongPredicate;
 
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Ordering;
 
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
@@ -32,7 +34,6 @@ import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.index.transactions.CompactionTransaction;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
-import org.apache.cassandra.metrics.CompactionMetrics;
 import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
 
 /**
@@ -58,6 +59,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
     private final OperationType type;
     private final CompactionController controller;
     private final List<ISSTableScanner> scanners;
+    private final ImmutableSet<SSTableReader> sstables;
     private final int nowInSec;
     private final UUID compactionId;
 
@@ -73,20 +75,20 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
     private final long[] mergeCounters;
 
     private final UnfilteredPartitionIterator compacted;
-    private final CompactionMetrics metrics;
+    private final ActiveCompactionsTracker activeCompactions;
 
     public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId)
     {
-        this(type, scanners, controller, nowInSec, compactionId, null, true);
+        this(type, scanners, controller, nowInSec, compactionId, ActiveCompactionsTracker.NOOP, true);
     }
 
-    public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId, CompactionMetrics metrics)
+    public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId, ActiveCompactionsTracker activeCompactions)
     {
-        this(type, scanners, controller, nowInSec, compactionId, metrics, true);
+        this(type, scanners, controller, nowInSec, compactionId, activeCompactions, true);
     }
 
     @SuppressWarnings("resource") // We make sure to close mergedIterator in close() and CompactionIterator is itself an AutoCloseable
-    public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId, CompactionMetrics metrics, boolean abortable)
+    public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId, ActiveCompactionsTracker activeCompactions, boolean abortable)
     {
         this.controller = controller;
         this.type = type;
@@ -100,10 +102,8 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
             bytes += scanner.getLengthInBytes();
         this.totalBytes = bytes;
         this.mergeCounters = new long[scanners.size()];
-        this.metrics = metrics;
-
-        if (metrics != null)
-            metrics.beginCompaction(this);
+        this.activeCompactions = activeCompactions == null ? ActiveCompactionsTracker.NOOP : activeCompactions;
+        this.activeCompactions.beginCompaction(this); // note that CompactionTask also calls this, but CT only creates CompactionIterator with a NOOP ActiveCompactions
 
         UnfilteredPartitionIterator merged = scanners.isEmpty()
                                            ? EmptyIterators.unfilteredPartition(controller.cfs.metadata())
@@ -114,6 +114,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
             compacted = Transformation.apply(merged, new AbortableUnfilteredPartitionTransformation(this));
         else
             compacted = merged;
+        sstables = scanners.stream().map(ISSTableScanner::getBackingSSTables).flatMap(Collection::stream).collect(ImmutableSet.toImmutableSet());
     }
 
     public TableMetadata metadata()
@@ -127,7 +128,8 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
                                   type,
                                   bytesRead,
                                   totalBytes,
-                                  compactionId);
+                                  compactionId,
+                                  sstables);
     }
 
     private void updateCounterFor(int rows)
@@ -254,8 +256,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
         }
         finally
         {
-            if (metrics != null)
-                metrics.finishCompaction(this);
+            activeCompactions.finishCompaction(this);
         }
     }
 
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 3eebd75..5998983 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -128,6 +128,8 @@ public class CompactionManager implements CompactionManagerMBean
     @VisibleForTesting
     final Multiset<ColumnFamilyStore> compactingCF = ConcurrentHashMultiset.create();
 
+    public final ActiveCompactions active = new ActiveCompactions();
+
     private final RateLimiter compactionRateLimiter = RateLimiter.create(Double.MAX_VALUE);
 
     public CompactionMetrics getMetrics()
@@ -203,10 +205,10 @@ public class CompactionManager implements CompactionManagerMBean
         return futures;
     }
 
-    public boolean isCompacting(Iterable<ColumnFamilyStore> cfses)
+    public boolean isCompacting(Iterable<ColumnFamilyStore> cfses, Predicate<SSTableReader> sstablePredicate)
     {
         for (ColumnFamilyStore cfs : cfses)
-            if (!cfs.getTracker().getCompacting().isEmpty())
+            if (cfs.getTracker().getCompacting().stream().anyMatch(sstablePredicate))
                 return true;
         return false;
     }
@@ -224,7 +226,7 @@ public class CompactionManager implements CompactionManagerMBean
         cacheCleanupExecutor.shutdown();
 
         // interrupt compactions and validations
-        for (Holder compactionHolder : CompactionMetrics.getCompactions())
+        for (Holder compactionHolder : active.getCompactions())
         {
             compactionHolder.stop();
         }
@@ -286,7 +288,7 @@ public class CompactionManager implements CompactionManagerMBean
                 }
                 else
                 {
-                    task.execute(metrics);
+                    task.execute(active);
                     ranCompaction = true;
                 }
             }
@@ -308,7 +310,7 @@ public class CompactionManager implements CompactionManagerMBean
                     AbstractCompactionTask upgradeTask = strategy.findUpgradeSSTableTask();
                     if (upgradeTask != null)
                     {
-                        upgradeTask.execute(metrics);
+                        upgradeTask.execute(active);
                         return true;
                     }
                 }
@@ -443,7 +445,7 @@ public class CompactionManager implements CompactionManagerMBean
             @Override
             public void execute(LifecycleTransaction input)
             {
-                scrubOne(cfs, input, skipCorrupted, checkData, reinsertOverflowedTTL);
+                scrubOne(cfs, input, skipCorrupted, checkData, reinsertOverflowedTTL, active);
             }
         }, jobs, OperationType.SCRUB);
     }
@@ -462,7 +464,7 @@ public class CompactionManager implements CompactionManagerMBean
             @Override
             public void execute(LifecycleTransaction input)
             {
-                verifyOne(cfs, input.onlyOne(), options);
+                verifyOne(cfs, input.onlyOne(), options, active);
             }
         }, 0, OperationType.VERIFY);
     }
@@ -495,7 +497,7 @@ public class CompactionManager implements CompactionManagerMBean
                 AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE);
                 task.setUserDefined(true);
                 task.setCompactionType(OperationType.UPGRADE_SSTABLES);
-                task.execute(metrics);
+                task.execute(active);
             }
         }, jobs, OperationType.UPGRADE_SSTABLES);
     }
@@ -566,7 +568,7 @@ public class CompactionManager implements CompactionManagerMBean
                 };
                 task.setUserDefined(true);
                 task.setCompactionType(OperationType.GARBAGE_COLLECT);
-                task.execute(metrics);
+                task.execute(active);
             }
         }, jobs, OperationType.GARBAGE_COLLECT);
     }
@@ -638,7 +640,7 @@ public class CompactionManager implements CompactionManagerMBean
                 AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE);
                 task.setUserDefined(true);
                 task.setCompactionType(OperationType.RELOCATE);
-                task.execute(metrics);
+                task.execute(active);
             }
         }, jobs, OperationType.RELOCATE);
     }
@@ -831,7 +833,7 @@ public class CompactionManager implements CompactionManagerMBean
             {
                 protected void runMayThrow()
                 {
-                    task.execute(metrics);
+                    task.execute(active);
                 }
             };
 
@@ -857,7 +859,7 @@ public class CompactionManager implements CompactionManagerMBean
                            return null;
                        }
                        return cfStore.getCompactionStrategyManager().getUserDefinedTasks(sstables, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()));
-                   }, false, false);
+                   }, (sstable) -> new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges), false, false);
 
         if (tasks == null)
             return;
@@ -868,7 +870,7 @@ public class CompactionManager implements CompactionManagerMBean
             {
                 for (AbstractCompactionTask task : tasks)
                     if (task != null)
-                        task.execute(metrics);
+                        task.execute(active);
             }
         };
 
@@ -1011,7 +1013,7 @@ public class CompactionManager implements CompactionManagerMBean
                     for (AbstractCompactionTask task : tasks)
                     {
                         if (task != null)
-                            task.execute(metrics);
+                            task.execute(active);
                     }
                 }
             }
@@ -1047,37 +1049,39 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
-    private void scrubOne(ColumnFamilyStore cfs, LifecycleTransaction modifier, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTL)
+    @VisibleForTesting
+    void scrubOne(ColumnFamilyStore cfs, LifecycleTransaction modifier, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTL, ActiveCompactionsTracker activeCompactions)
     {
         CompactionInfo.Holder scrubInfo = null;
 
         try (Scrubber scrubber = new Scrubber(cfs, modifier, skipCorrupted, checkData, reinsertOverflowedTTL))
         {
             scrubInfo = scrubber.getScrubInfo();
-            metrics.beginCompaction(scrubInfo);
+            activeCompactions.beginCompaction(scrubInfo);
             scrubber.scrub();
         }
         finally
         {
             if (scrubInfo != null)
-                metrics.finishCompaction(scrubInfo);
+                activeCompactions.finishCompaction(scrubInfo);
         }
     }
 
-    private void verifyOne(ColumnFamilyStore cfs, SSTableReader sstable, Verifier.Options options)
+    @VisibleForTesting
+    void verifyOne(ColumnFamilyStore cfs, SSTableReader sstable, Verifier.Options options, ActiveCompactionsTracker activeCompactions)
     {
         CompactionInfo.Holder verifyInfo = null;
 
         try (Verifier verifier = new Verifier(cfs, sstable, false, options))
         {
             verifyInfo = verifier.getVerifyInfo();
-            metrics.beginCompaction(verifyInfo);
+            activeCompactions.beginCompaction(verifyInfo);
             verifier.verify();
         }
         finally
         {
             if (verifyInfo != null)
-                metrics.finishCompaction(verifyInfo);
+                activeCompactions.finishCompaction(verifyInfo);
         }
     }
 
@@ -1198,7 +1202,7 @@ public class CompactionManager implements CompactionManagerMBean
              ISSTableScanner scanner = cleanupStrategy.getScanner(sstable);
              CompactionController controller = new CompactionController(cfs, txn.originals(), getDefaultGcBefore(cfs, nowInSec));
              Refs<SSTableReader> refs = Refs.ref(Collections.singleton(sstable));
-             CompactionIterator ci = new CompactionIterator(OperationType.CLEANUP, Collections.singletonList(scanner), controller, nowInSec, UUIDGen.getTimeUUID(), metrics))
+             CompactionIterator ci = new CompactionIterator(OperationType.CLEANUP, Collections.singletonList(scanner), controller, nowInSec, UUIDGen.getTimeUUID(), active))
         {
             StatsMetadata metadata = sstable.getSSTableMetadata();
             writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, metadata.repairedAt, metadata.pendingRepair, metadata.isTransient, sstable, txn));
@@ -1495,7 +1499,7 @@ public class CompactionManager implements CompactionManagerMBean
 
              AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(txn.originals());
              CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs, nowInSec));
-             CompactionIterator ci = getAntiCompactionIterator(scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID(), metrics))
+             CompactionIterator ci = getAntiCompactionIterator(scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID(), active))
         {
             int expectedBloomFilterSize = Math.max(cfs.metadata().params.minIndexInterval, (int)(SSTableReader.getApproximateKeyCount(sstableAsSet)));
 
@@ -1556,28 +1560,26 @@ public class CompactionManager implements CompactionManagerMBean
     }
 
     @VisibleForTesting
-    public static CompactionIterator getAntiCompactionIterator(List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID timeUUID, CompactionMetrics metrics)
+    public static CompactionIterator getAntiCompactionIterator(List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID timeUUID, ActiveCompactionsTracker activeCompactions)
     {
-        return new CompactionIterator(OperationType.ANTICOMPACTION, scanners, controller, nowInSec, timeUUID, metrics, false);
+        return new CompactionIterator(OperationType.ANTICOMPACTION, scanners, controller, nowInSec, timeUUID, activeCompactions, false);
     }
 
-    /**
-     * Is not scheduled, because it is performing disjoint work from sstable compaction.
-     */
-    public ListenableFuture<?> submitIndexBuild(final SecondaryIndexBuilder builder)
+    @VisibleForTesting
+    ListenableFuture<?> submitIndexBuild(final SecondaryIndexBuilder builder, ActiveCompactionsTracker activeCompactions)
     {
         Runnable runnable = new Runnable()
         {
             public void run()
             {
-                metrics.beginCompaction(builder);
+                activeCompactions.beginCompaction(builder);
                 try
                 {
                     builder.build();
                 }
                 finally
                 {
-                    metrics.finishCompaction(builder);
+                    activeCompactions.finishCompaction(builder);
                 }
             }
         };
@@ -1585,8 +1587,21 @@ public class CompactionManager implements CompactionManagerMBean
         return executor.submitIfRunning(runnable, "index build");
     }
 
+    /**
+     * Is not scheduled, because it is performing disjoint work from sstable compaction.
+     */
+    public ListenableFuture<?> submitIndexBuild(final SecondaryIndexBuilder builder)
+    {
+        return submitIndexBuild(builder, active);
+    }
+
     public Future<?> submitCacheWrite(final AutoSavingCache.Writer writer)
     {
+        return submitCacheWrite(writer, active);
+    }
+
+    Future<?> submitCacheWrite(final AutoSavingCache.Writer writer, ActiveCompactionsTracker activeCompactions)
+    {
         Runnable runnable = new Runnable()
         {
             public void run()
@@ -1598,14 +1613,14 @@ public class CompactionManager implements CompactionManagerMBean
                 }
                 try
                 {
-                    metrics.beginCompaction(writer);
+                    activeCompactions.beginCompaction(writer);
                     try
                     {
                         writer.saveCache();
                     }
                     finally
                     {
-                        metrics.finishCompaction(writer);
+                        activeCompactions.finishCompaction(writer);
                     }
                 }
                 finally
@@ -1620,15 +1635,20 @@ public class CompactionManager implements CompactionManagerMBean
 
     public List<SSTableReader> runIndexSummaryRedistribution(IndexSummaryRedistribution redistribution) throws IOException
     {
-        metrics.beginCompaction(redistribution);
+        return runIndexSummaryRedistribution(redistribution, active);
+    }
 
+    @VisibleForTesting
+    List<SSTableReader> runIndexSummaryRedistribution(IndexSummaryRedistribution redistribution, ActiveCompactionsTracker activeCompactions) throws IOException
+    {
+        activeCompactions.beginCompaction(redistribution);
         try
         {
             return redistribution.redistributeSummaries();
         }
         finally
         {
-            metrics.finishCompaction(redistribution);
+            activeCompactions.finishCompaction(redistribution);
         }
     }
 
@@ -1641,22 +1661,28 @@ public class CompactionManager implements CompactionManagerMBean
 
     public ListenableFuture<Long> submitViewBuilder(final ViewBuilderTask task)
     {
+        return submitViewBuilder(task, active);
+    }
+
+    @VisibleForTesting
+    ListenableFuture<Long> submitViewBuilder(final ViewBuilderTask task, ActiveCompactionsTracker activeCompactions)
+    {
         return viewBuildExecutor.submitIfRunning(() -> {
-            metrics.beginCompaction(task);
+            activeCompactions.beginCompaction(task);
             try
             {
                 return task.call();
             }
             finally
             {
-                metrics.finishCompaction(task);
+                activeCompactions.finishCompaction(task);
             }
         }, "view build");
     }
 
     public int getActiveCompactions()
     {
-        return CompactionMetrics.getCompactions().size();
+        return active.getCompactions().size();
     }
 
     static class CompactionExecutor extends JMXEnabledThreadPoolExecutor
@@ -1779,13 +1805,6 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
-    public interface CompactionExecutorStatsCollector
-    {
-        void beginCompaction(CompactionInfo.Holder ci);
-
-        void finishCompaction(CompactionInfo.Holder ci);
-    }
-
     public void incrementAborted()
     {
         metrics.compactionsAborted.inc();
@@ -1804,7 +1823,7 @@ public class CompactionManager implements CompactionManagerMBean
 
     public List<Map<String, String>> getCompactions()
     {
-        List<Holder> compactionHolders = CompactionMetrics.getCompactions();
+        List<Holder> compactionHolders = active.getCompactions();
         List<Map<String, String>> out = new ArrayList<Map<String, String>>(compactionHolders.size());
         for (CompactionInfo.Holder ci : compactionHolders)
             out.add(ci.getCompactionInfo().asMap());
@@ -1813,7 +1832,7 @@ public class CompactionManager implements CompactionManagerMBean
 
     public List<String> getCompactionSummary()
     {
-        List<Holder> compactionHolders = CompactionMetrics.getCompactions();
+        List<Holder> compactionHolders = active.getCompactions();
         List<String> out = new ArrayList<String>(compactionHolders.size());
         for (CompactionInfo.Holder ci : compactionHolders)
             out.add(ci.getCompactionInfo().toString());
@@ -1855,7 +1874,7 @@ public class CompactionManager implements CompactionManagerMBean
     public void stopCompaction(String type)
     {
         OperationType operation = OperationType.valueOf(type);
-        for (Holder holder : CompactionMetrics.getCompactions())
+        for (Holder holder : active.getCompactions())
         {
             if (holder.getCompactionInfo().getTaskType() == operation)
                 holder.stop();
@@ -1864,7 +1883,7 @@ public class CompactionManager implements CompactionManagerMBean
 
     public void stopCompactionById(String compactionId)
     {
-        for (Holder holder : CompactionMetrics.getCompactions())
+        for (Holder holder : active.getCompactions())
         {
             UUID holderId = holder.getCompactionInfo().getTaskId();
             if (holderId != null && holderId.equals(UUID.fromString(compactionId)))
@@ -2004,41 +2023,45 @@ public class CompactionManager implements CompactionManagerMBean
      * isCompacting if you want that behavior.
      *
      * @param columnFamilies The ColumnFamilies to try to stop compaction upon.
+     * @param sstablePredicate the sstable predicate to match on
      * @param interruptValidation true if validation operations for repair should also be interrupted
-     *
      */
-    public void interruptCompactionFor(Iterable<TableMetadata> columnFamilies, boolean interruptValidation)
+    public void interruptCompactionFor(Iterable<TableMetadata> columnFamilies, Predicate<SSTableReader> sstablePredicate, boolean interruptValidation)
     {
         assert columnFamilies != null;
 
         // interrupt in-progress compactions
-        for (Holder compactionHolder : CompactionMetrics.getCompactions())
+        for (Holder compactionHolder : active.getCompactions())
         {
             CompactionInfo info = compactionHolder.getCompactionInfo();
             if ((info.getTaskType() == OperationType.VALIDATION) && !interruptValidation)
                 continue;
 
             if (Iterables.contains(columnFamilies, info.getTableMetadata()))
-                compactionHolder.stop(); // signal compaction to stop
+            {
+                if (info.shouldStop(sstablePredicate))
+                    compactionHolder.stop();
+            }
         }
     }
 
-    public void interruptCompactionForCFs(Iterable<ColumnFamilyStore> cfss, boolean interruptValidation)
+    public void interruptCompactionForCFs(Iterable<ColumnFamilyStore> cfss, Predicate<SSTableReader> sstablePredicate, boolean interruptValidation)
     {
         List<TableMetadata> metadata = new ArrayList<>();
         for (ColumnFamilyStore cfs : cfss)
             metadata.add(cfs.metadata());
 
-        interruptCompactionFor(metadata, interruptValidation);
+        interruptCompactionFor(metadata, sstablePredicate, interruptValidation);
     }
 
-    public void waitForCessation(Iterable<ColumnFamilyStore> cfss)
+    public void waitForCessation(Iterable<ColumnFamilyStore> cfss, Predicate<SSTableReader> sstablePredicate)
     {
         long start = System.nanoTime();
         long delay = TimeUnit.MINUTES.toNanos(1);
+
         while (System.nanoTime() - start < delay)
         {
-            if (CompactionManager.instance.isCompacting(cfss))
+            if (CompactionManager.instance.isCompacting(cfss, sstablePredicate))
                 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
             else
                 break;
@@ -2047,12 +2070,12 @@ public class CompactionManager implements CompactionManagerMBean
 
     public List<CompactionInfo> getSSTableTasks()
     {
-        return CompactionMetrics.getCompactions()
-                                .stream()
-                                .map(CompactionInfo.Holder::getCompactionInfo)
-                                .filter(task -> task.getTaskType() != OperationType.COUNTER_CACHE_SAVE
-                                             && task.getTaskType() != OperationType.KEY_CACHE_SAVE
-                                             && task.getTaskType() != OperationType.ROW_CACHE_SAVE)
-                                .collect(Collectors.toList());
+        return active.getCompactions()
+                     .stream()
+                     .map(CompactionInfo.Holder::getCompactionInfo)
+                     .filter(task -> task.getTaskType() != OperationType.COUNTER_CACHE_SAVE
+                                     && task.getTaskType() != OperationType.KEY_CACHE_SAVE
+                                     && task.getTaskType() != OperationType.ROW_CACHE_SAVE)
+                     .collect(Collectors.toList());
     }
 }
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 591b7c4..685da2e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -37,7 +37,6 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
 import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
 import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -53,7 +52,7 @@ public class CompactionTask extends AbstractCompactionTask
     protected final int gcBefore;
     protected final boolean keepOriginals;
     protected static long totalBytesCompacted = 0;
-    private CompactionExecutorStatsCollector collector;
+    private ActiveCompactionsTracker activeCompactions;
 
     public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int gcBefore)
     {
@@ -78,9 +77,9 @@ public class CompactionTask extends AbstractCompactionTask
         return totalBytesCompacted += bytesCompacted;
     }
 
-    protected int executeInternal(CompactionExecutorStatsCollector collector)
+    protected int executeInternal(ActiveCompactionsTracker activeCompactions)
     {
-        this.collector = collector;
+        this.activeCompactions = activeCompactions == null ? ActiveCompactionsTracker.NOOP : activeCompactions;
         run();
         return transaction.originals().size();
     }
@@ -188,8 +187,7 @@ public class CompactionTask extends AbstractCompactionTask
                 if (!controller.cfs.getCompactionStrategyManager().isActive())
                     throw new CompactionInterruptedException(ci.getCompactionInfo());
 
-                if (collector != null)
-                    collector.beginCompaction(ci);
+                activeCompactions.beginCompaction(ci);
 
                 try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, getDirectories(), transaction, actuallyCompact))
                 {
@@ -219,11 +217,8 @@ public class CompactionTask extends AbstractCompactionTask
                 }
                 finally
                 {
-                    if (collector != null)
-                        collector.finishCompaction(ci);
-
+                    activeCompactions.finishCompaction(ci);
                     mergedRowCounts = ci.getMergedRowCounts();
-
                     totalSourceCQLRows = ci.getTotalSourceCQLRows();
                 }
             }
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 3437094..74ffccb 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -493,9 +493,9 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
             return currentScanner == null ? totalBytesScanned : totalBytesScanned + currentScanner.getBytesScanned();
         }
 
-        public String getBackingFiles()
+        public Set<SSTableReader> getBackingSSTables()
         {
-            return Joiner.on(", ").join(sstables);
+            return ImmutableSet.copyOf(sstables);
         }
     }
 
diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
index 1aa156f..b2d70f7 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
@@ -479,7 +479,7 @@ class PendingRepairManager
             throw new UnsupportedOperationException();
         }
 
-        protected int executeInternal(CompactionManager.CompactionExecutorStatsCollector collector)
+        protected int executeInternal(ActiveCompactionsTracker activeCompactions)
         {
             run();
             return transaction.originals().size();
diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
index e9ae429..1746d7c 100644
--- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@ -31,8 +31,6 @@ public class SSTableSplitter
 {
     private final SplittingCompactionTask task;
 
-    private CompactionInfo.Holder info;
-
     public SSTableSplitter(ColumnFamilyStore cfs, LifecycleTransaction transaction, int sstableSizeInMB)
     {
         this.task = new SplittingCompactionTask(cfs, transaction, sstableSizeInMB);
@@ -40,20 +38,7 @@ public class SSTableSplitter
 
     public void split()
     {
-        task.execute(new StatsCollector());
-    }
-
-    public class StatsCollector implements CompactionManager.CompactionExecutorStatsCollector
-    {
-        public void beginCompaction(CompactionInfo.Holder ci)
-        {
-            SSTableSplitter.this.info = ci;
-        }
-
-        public void finishCompaction(CompactionInfo.Holder ci)
-        {
-            // no-op
-        }
+        task.execute(ActiveCompactionsTracker.NOOP);
     }
 
     public static class SplittingCompactionTask extends CompactionTask
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index aa41051..63c89e8 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -23,6 +23,7 @@ import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableSet;
 
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
@@ -475,7 +476,8 @@ public class Scrubber implements Closeable
                                           OperationType.SCRUB,
                                           dataFile.getFilePointer(),
                                           dataFile.length(),
-                                          scrubCompactionId);
+                                          scrubCompactionId,
+                                          ImmutableSet.of(sstable));
             }
             catch (Exception e)
             {
diff --git a/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java b/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java
index 3522d61..02a1c49 100644
--- a/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java
@@ -57,7 +57,7 @@ public class SingleSSTableLCSTask extends AbstractCompactionTask
     }
 
     @Override
-    protected int executeInternal(CompactionManager.CompactionExecutorStatsCollector collector)
+    protected int executeInternal(ActiveCompactionsTracker activeCompactions)
     {
         run();
         return 1;
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index 446d527..161620a 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableSet;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -500,7 +501,8 @@ public class Verifier implements Closeable
                                           OperationType.VERIFY,
                                           dataFile.getFilePointer(),
                                           dataFile.length(),
-                                          verificationCompactionId);
+                                          verificationCompactionId,
+                                          ImmutableSet.of(sstable));
             }
             catch (Exception e)
             {
diff --git a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
index caf1b8e..d653f6c 100644
--- a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
+++ b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
@@ -39,6 +39,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+import org.apache.cassandra.db.compaction.ActiveCompactions;
+import org.apache.cassandra.db.compaction.ActiveCompactionsTracker;
 import org.apache.cassandra.db.compaction.CompactionController;
 import org.apache.cassandra.db.compaction.CompactionInterruptedException;
 import org.apache.cassandra.db.compaction.CompactionIterator;
@@ -107,9 +109,9 @@ public class CassandraValidationIterator extends ValidationPartitionIterator
 
     private static class ValidationCompactionIterator extends CompactionIterator
     {
-        public ValidationCompactionIterator(List<ISSTableScanner> scanners, ValidationCompactionController controller, int nowInSec, CompactionMetrics metrics)
+        public ValidationCompactionIterator(List<ISSTableScanner> scanners, ValidationCompactionController controller, int nowInSec, ActiveCompactionsTracker activeCompactions)
         {
-            super(OperationType.VALIDATION, scanners, controller, nowInSec, UUIDGen.getTimeUUID(), metrics);
+            super(OperationType.VALIDATION, scanners, controller, nowInSec, UUIDGen.getTimeUUID(), activeCompactions);
         }
     }
 
@@ -226,7 +228,7 @@ public class CassandraValidationIterator extends ValidationPartitionIterator
         Preconditions.checkArgument(sstables != null);
         controller = new ValidationCompactionController(cfs, getDefaultGcBefore(cfs, nowInSec));
         scanners = cfs.getCompactionStrategyManager().getScanners(sstables, ranges);
-        ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, CompactionManager.instance.getMetrics());
+        ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, CompactionManager.instance.active);
 
         long allPartitions = 0;
         rangePartitionCounts = Maps.newHashMapWithExpectedSize(ranges.size());
diff --git a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
index 1bc2fce..6040ea8 100644
--- a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
+++ b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db.repair;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
@@ -29,8 +28,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -41,6 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.CompactionInfo;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -86,97 +86,116 @@ public class PendingAntiCompaction
         }
     }
 
-    static class SSTableAcquisitionException extends RuntimeException {}
+    static class SSTableAcquisitionException extends RuntimeException
+    {
+        SSTableAcquisitionException(String message)
+        {
+            super(message);
+        }
+    }
 
-    static class AcquisitionCallable implements Callable<AcquireResult>
+    @VisibleForTesting
+    static class AntiCompactionPredicate implements Predicate<SSTableReader>
     {
-        private final ColumnFamilyStore cfs;
         private final Collection<Range<Token>> ranges;
-        private final UUID sessionID;
+        private final UUID prsid;
 
-        public AcquisitionCallable(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, UUID sessionID)
+        public AntiCompactionPredicate(Collection<Range<Token>> ranges, UUID prsid)
         {
-            this.cfs = cfs;
             this.ranges = ranges;
-            this.sessionID = sessionID;
+            this.prsid = prsid;
         }
 
-        private Iterable<SSTableReader> getSSTables()
+        public boolean apply(SSTableReader sstable)
         {
-            Set<UUID> conflictingSessions = new HashSet<>();
+            if (!sstable.intersects(ranges))
+                return false;
 
-            Iterable<SSTableReader> sstables = cfs.getLiveSSTables().stream().filter(sstable -> {
-                if (!sstable.intersects(ranges))
-                    return false;
+            StatsMetadata metadata = sstable.getSSTableMetadata();
 
-                StatsMetadata metadata = sstable.getSSTableMetadata();
-
-                // exclude repaired sstables
-                if (metadata.repairedAt != UNREPAIRED_SSTABLE)
-                    return false;
+            // exclude repaired sstables
+            if (metadata.repairedAt != UNREPAIRED_SSTABLE)
+                return false;
 
-                // exclude sstables pending repair, but record session ids for
-                // non-finalized sessions for a later error message
-                if (metadata.pendingRepair != NO_PENDING_REPAIR)
+            // exclude sstables pending repair, but record session ids for
+            // non-finalized sessions for a later error message
+            if (metadata.pendingRepair != NO_PENDING_REPAIR)
+            {
+                if (!ActiveRepairService.instance.consistent.local.isSessionFinalized(metadata.pendingRepair))
                 {
-                    if (!ActiveRepairService.instance.consistent.local.isSessionFinalized(metadata.pendingRepair))
-                    {
-                        conflictingSessions.add(metadata.pendingRepair);
-                    }
-                    return false;
+                    String message = String.format("Prepare phase for incremental repair session %s has failed because it encountered " +
+                                                   "intersecting sstables belonging to another incremental repair session (%s). This is " +
+                                                   "caused by starting an incremental repair session before a previous one has completed. " +
+                                                   "Check nodetool repair_admin for hung sessions and fix them.", prsid, metadata.pendingRepair);
+                    throw new SSTableAcquisitionException(message);
                 }
-
-                return true;
-            }).collect(Collectors.toList());
-
-            // If there are sstables we'd like to acquire that are currently held by other sessions, we need to bail out. If we
-            // didn't bail out here and the other repair sessions we're seeing were to fail, incremental repair behavior would be
-            // confusing. You generally expect all data received before a repair session to be repaired when the session completes,
-            // and that wouldn't be the case if the other session failed and moved it's data back to unrepaired.
-            if (!conflictingSessions.isEmpty())
+                return false;
+            }
+            CompactionInfo ci = CompactionManager.instance.active.getCompactionForSSTable(sstable);
+            if (ci != null && ci.getTaskType() == OperationType.ANTICOMPACTION)
             {
-                logger.warn("Prepare phase for incremental repair session {} has failed because it encountered " +
-                            "intersecting sstables belonging to another incremental repair session(s) ({}). This is " +
-                            "caused by starting an incremental repair session before a previous one has completed. " +
-                            "Check nodetool repair_admin for hung sessions and fix them.",
-                            sessionID, conflictingSessions);
-                throw new SSTableAcquisitionException();
+                // todo: start tracking the parent repair session id that created the anticompaction to be able to give a better error messsage here:
+                String message = String.format("Prepare phase for incremental repair session %s has failed because it encountered " +
+                                               "intersecting sstables belonging to another incremental repair session. This is " +
+                                               "caused by starting multiple conflicting incremental repairs at the same time", prsid);
+                throw new SSTableAcquisitionException(message);
             }
+            return true;
+        }
+    }
+
+    static class AcquisitionCallable implements Callable<AcquireResult>
+    {
+        private final ColumnFamilyStore cfs;
+        private final UUID sessionID;
+        private final AntiCompactionPredicate predicate;
 
-            return sstables;
+        public AcquisitionCallable(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, UUID sessionID)
+        {
+            this.cfs = cfs;
+            this.sessionID = sessionID;
+            predicate = new AntiCompactionPredicate(ranges, sessionID);
         }
 
         @SuppressWarnings("resource")
         private AcquireResult acquireTuple()
         {
-            List<SSTableReader> sstables = Lists.newArrayList(getSSTables());
-            if (sstables.isEmpty())
-                return new AcquireResult(cfs, null, null);
+            // this method runs with compactions stopped & disabled
+            try
+            {
+                // using predicate might throw if there are conflicting ranges
+                Set<SSTableReader> sstables = cfs.getLiveSSTables().stream().filter(predicate).collect(Collectors.toSet());
+                if (sstables.isEmpty())
+                    return new AcquireResult(cfs, null, null);
+
+                LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+                if (txn != null)
+                    return new AcquireResult(cfs, Refs.ref(sstables), txn);
+            }
+            catch (SSTableAcquisitionException e)
+            {
+                logger.warn(e.getMessage());
+                logger.debug("Got exception trying to acquire sstables", e);
+            }
 
-            LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
-            if (txn != null)
-                return new AcquireResult(cfs, Refs.ref(sstables), txn);
-            else
-                return null;
+            return null;
         }
 
-        public AcquireResult call() throws Exception
+        public AcquireResult call()
         {
             logger.debug("acquiring sstables for pending anti compaction on session {}", sessionID);
+            // try to modify after cancelling running compactions. This will attempt to cancel in flight compactions including the given sstables for
+            // up to a minute, after which point, null will be returned
             try
             {
-                AcquireResult refTxn = acquireTuple();
-                if (refTxn != null)
-                    return refTxn;
+                return cfs.runWithCompactionsDisabled(this::acquireTuple, predicate, false, false);
             }
             catch (SSTableAcquisitionException e)
             {
-                return null;
+                logger.warn(e.getMessage());
+                logger.debug("Got exception trying to acquire sstables", e);
             }
-
-            // try to modify after cancelling running compactions. This will attempt to cancel in flight compactions for
-            // up to a minute, after which point, null will be returned
-            return cfs.runWithCompactionsDisabled(this::acquireTuple, false, false);
+            return null;
         }
     }
 
@@ -223,11 +242,12 @@ public class PendingAntiCompaction
                         result.abort();
                     }
                 }
-                logger.warn("Prepare phase for incremental repair session {} was unable to " +
-                            "acquire exclusive access to the neccesary sstables. " +
-                            "This is usually caused by running multiple incremental repairs on nodes that share token ranges",
-                            parentRepairSession);
-                return Futures.immediateFailedFuture(new SSTableAcquisitionException());
+                String message = String.format("Prepare phase for incremental repair session %s was unable to " +
+                                               "acquire exclusive access to the neccesary sstables. " +
+                                               "This is usually caused by running multiple incremental repairs on nodes that share token ranges",
+                                               parentRepairSession);
+                logger.warn(message);
+                return Futures.immediateFailedFuture(new SSTableAcquisitionException(message));
             }
             else
             {
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java b/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
index db78a54..d041b48 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
@@ -26,8 +26,10 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
 import com.google.common.util.concurrent.Futures;
@@ -76,7 +78,8 @@ public class ViewBuilderTask extends CompactionInfo.Holder implements Callable<L
     private volatile boolean isStopped = false;
     private volatile boolean isCompactionInterrupted = false;
 
-    ViewBuilderTask(ColumnFamilyStore baseCfs, View view, Range<Token> range, Token lastToken, long keysBuilt)
+    @VisibleForTesting
+    public ViewBuilderTask(ColumnFamilyStore baseCfs, View view, Range<Token> range, Token lastToken, long keysBuilt)
     {
         this.baseCfs = baseCfs;
         this.view = view;
@@ -195,17 +198,20 @@ public class ViewBuilderTask extends CompactionInfo.Holder implements Callable<L
     @Override
     public CompactionInfo getCompactionInfo()
     {
+        // we don't know the sstables at construction of ViewBuilderTask and we could change this to return once we know the
+        // but since we basically only cancel view builds on truncation where we cancel all compactions anyway, this seems reasonable
+
         // If there's splitter, calculate progress based on last token position
         if (range.left.getPartitioner().splitter().isPresent())
         {
             long progress = prevToken == null ? 0 : Math.round(prevToken.getPartitioner().splitter().get().positionInRange(prevToken, range) * 1000);
-            return new CompactionInfo(baseCfs.metadata(), OperationType.VIEW_BUILD, progress, 1000, Unit.RANGES, compactionId);
+            return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, progress, 1000, Unit.RANGES, compactionId);
         }
 
         // When there is no splitter, estimate based on number of total keys but
         // take the max with keysBuilt + 1 to avoid having more completed than total
         long keysTotal = Math.max(keysBuilt + 1, baseCfs.estimatedKeysForRange(range));
-        return new CompactionInfo(baseCfs.metadata(), OperationType.VIEW_BUILD, keysBuilt, keysTotal, Unit.KEYS, compactionId);
+        return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, keysBuilt, keysTotal, Unit.KEYS, compactionId);
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java
index 7019038..b9f38ce 100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@ -159,7 +159,7 @@ public interface Index
         @SuppressWarnings("resource")
         public SecondaryIndexBuilder getIndexBuildTask(ColumnFamilyStore cfs, Set<Index> indexes, Collection<SSTableReader> sstables)
         {
-            return new CollatedViewIndexBuilder(cfs, indexes, new ReducingKeyIterator(sstables));
+            return new CollatedViewIndexBuilder(cfs, indexes, new ReducingKeyIterator(sstables), sstables);
         }
     }
 
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index abe2460..ecd25fd 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -660,8 +660,8 @@ public abstract class CassandraIndex implements Index
     {
         // interrupt in-progress compactions
         Collection<ColumnFamilyStore> cfss = Collections.singleton(indexCfs);
-        CompactionManager.instance.interruptCompactionForCFs(cfss, true);
-        CompactionManager.instance.waitForCessation(cfss);
+        CompactionManager.instance.interruptCompactionForCFs(cfss, (sstable) -> true, true);
+        CompactionManager.instance.waitForCessation(cfss, (sstable) -> true);
         Keyspace.writeOrder.awaitNewBarrier();
         indexCfs.forceBlockingFlush();
         indexCfs.readOrdering.awaitNewBarrier();
@@ -709,7 +709,8 @@ public abstract class CassandraIndex implements Index
 
             SecondaryIndexBuilder builder = new CollatedViewIndexBuilder(baseCfs,
                                                                          Collections.singleton(this),
-                                                                         new ReducingKeyIterator(sstables));
+                                                                         new ReducingKeyIterator(sstables),
+                                                                         ImmutableSet.copyOf(sstables));
             Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
             FBUtilities.waitOnFuture(future);
             indexCfs.forceBlockingFlush();
diff --git a/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java b/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java
index 076346a..3c005c4 100644
--- a/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.index.internal;
 
+import java.util.Collection;
 import java.util.Set;
 import java.util.UUID;
 
@@ -28,6 +29,7 @@ import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.SecondaryIndexBuilder;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.UUIDGen;
 
 /**
@@ -39,13 +41,15 @@ public class CollatedViewIndexBuilder extends SecondaryIndexBuilder
     private final Set<Index> indexers;
     private final ReducingKeyIterator iter;
     private final UUID compactionId;
+    private final Collection<SSTableReader> sstables;
 
-    public CollatedViewIndexBuilder(ColumnFamilyStore cfs, Set<Index> indexers, ReducingKeyIterator iter)
+    public CollatedViewIndexBuilder(ColumnFamilyStore cfs, Set<Index> indexers, ReducingKeyIterator iter, Collection<SSTableReader> sstables)
     {
         this.cfs = cfs;
         this.indexers = indexers;
         this.iter = iter;
         this.compactionId = UUIDGen.getTimeUUID();
+        this.sstables = sstables;
     }
 
     public CompactionInfo getCompactionInfo()
@@ -54,7 +58,8 @@ public class CollatedViewIndexBuilder extends SecondaryIndexBuilder
                 OperationType.INDEX_BUILD,
                 iter.getBytesRead(),
                 iter.getTotalBytes(),
-                compactionId);
+                compactionId,
+                sstables);
     }
 
     public void build()
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
index a01e45b..bb42dc2 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
@@ -127,7 +127,8 @@ class SASIIndexBuilder extends SecondaryIndexBuilder
                                   OperationType.INDEX_BUILD,
                                   bytesProcessed,
                                   totalSizeInBytes,
-                                  compactionId);
+                                  compactionId,
+                                  sstables.keySet());
     }
 
     private long getPrimaryIndexLength(SSTable sstable)
diff --git a/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
index 1c1d74b..af661b7 100644
--- a/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
@@ -20,10 +20,12 @@
 package org.apache.cassandra.io.sstable;
 
 import java.util.Collection;
+import java.util.Set;
 
 import com.google.common.base.Throwables;
 
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 /**
@@ -36,7 +38,7 @@ public interface ISSTableScanner extends UnfilteredPartitionIterator
     public long getCompressedLengthInBytes();
     public long getCurrentPosition();
     public long getBytesScanned();
-    public String getBackingFiles();
+    public Set<SSTableReader> getBackingSSTables();
 
     public static void closeAllAndPropagate(Collection<ISSTableScanner> scanners, Throwable throwable)
     {
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
index 0e1d0f0..b4fca41 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java
@@ -301,7 +301,7 @@ public class IndexSummaryRedistribution extends CompactionInfo.Holder
 
     public CompactionInfo getCompactionInfo()
     {
-        return new CompactionInfo(OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), memoryPoolBytes, Unit.BYTES, compactionId);
+        return new CompactionInfo(OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), memoryPoolBytes, Unit.BYTES, compactionId, compacting);
     }
 
     /** Utility class for sorting sstables by their read rates. */
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index cb25c6b..5097d96 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@ -23,6 +23,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.AbstractIterator;
+
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterators;
 
 import org.apache.cassandra.db.*;
@@ -241,9 +243,9 @@ public class BigTableScanner implements ISSTableScanner
         return sstable.onDiskLength();
     }
 
-    public String getBackingFiles()
+    public Set<SSTableReader> getBackingSSTables()
     {
-        return sstable.toString();
+        return ImmutableSet.of(sstable);
     }
 
 
@@ -420,9 +422,9 @@ public class BigTableScanner implements ISSTableScanner
             return 0;
         }
 
-        public String getBackingFiles()
+        public Set<SSTableReader> getBackingSSTables()
         {
-            return sstable.getFilename();
+            return ImmutableSet.of(sstable);
         }
 
         public TableMetadata metadata()
diff --git a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
index d54090b..46e5940 100644
--- a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
@@ -26,8 +26,10 @@ import com.codahale.metrics.Meter;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.ActiveCompactions;
 import org.apache.cassandra.db.compaction.CompactionInfo;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
 
@@ -36,13 +38,10 @@ import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 /**
  * Metrics for compaction.
  */
-public class CompactionMetrics implements CompactionManager.CompactionExecutorStatsCollector
+public class CompactionMetrics
 {
     public static final MetricNameFactory factory = new DefaultNameFactory("Compaction");
 
-    // a synchronized identity set of running tasks to their compaction info
-    private static final Set<CompactionInfo.Holder> compactions = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<CompactionInfo.Holder, Boolean>()));
-
     /** Estimated number of compactions remaining to perform */
     public final Gauge<Integer> pendingTasks;
     /** Estimated number of compactions remaining to perform, group by keyspace and then table name */
@@ -79,7 +78,7 @@ public class CompactionMetrics implements CompactionManager.CompactionExecutorSt
                         n += cfs.getCompactionStrategyManager().getEstimatedRemainingTasks();
                 }
                 // add number of currently running compactions
-                return n + compactions.size();
+                return n + CompactionManager.instance.active.getCompactions().size();
             }
         });
 
@@ -108,7 +107,7 @@ public class CompactionMetrics implements CompactionManager.CompactionExecutorSt
                 }
 
                 // currently running compactions
-                for (CompactionInfo.Holder compaction : compactions)
+                for (CompactionInfo.Holder compaction : CompactionManager.instance.active.getCompactions())
                 {
                     TableMetadata metaData = compaction.getCompactionInfo().getTableMetadata();
                     if (metaData == null)
@@ -153,21 +152,4 @@ public class CompactionMetrics implements CompactionManager.CompactionExecutorSt
         sstablesDropppedFromCompactions = Metrics.counter(factory.createMetricName("SSTablesDroppedFromCompaction"));
         compactionsAborted = Metrics.counter(factory.createMetricName("CompactionsAborted"));
     }
-
-    public void beginCompaction(CompactionInfo.Holder ci)
-    {
-        compactions.add(ci);
-    }
-
-    public void finishCompaction(CompactionInfo.Holder ci)
-    {
-        compactions.remove(ci);
-        bytesCompacted.inc(ci.getCompactionInfo().getTotal());
-        totalCompactionsCompleted.mark();
-    }
-
-    public static List<CompactionInfo.Holder> getCompactions()
-    {
-        return new ArrayList<CompactionInfo.Holder>(compactions);
-    }
 }
diff --git a/src/java/org/apache/cassandra/schema/Schema.java b/src/java/org/apache/cassandra/schema/Schema.java
index 970d9ac..2c2d444 100644
--- a/src/java/org/apache/cassandra/schema/Schema.java
+++ b/src/java/org/apache/cassandra/schema/Schema.java
@@ -759,7 +759,7 @@ public final class Schema
         assert cfs != null;
         // make sure all the indexes are dropped, or else.
         cfs.indexManager.markAllIndexesRemoved();
-        CompactionManager.instance.interruptCompactionFor(Collections.singleton(metadata), true);
+        CompactionManager.instance.interruptCompactionFor(Collections.singleton(metadata), (sstable) -> true, true);
         if (DatabaseDescriptor.isAutoSnapshot())
             cfs.snapshot(Keyspace.getTimestampedSnapshotNameWithPrefix(cfs.name, ColumnFamilyStore.SNAPSHOT_DROP_PREFIX));
         CommitLog.instance.forceRecycleAllSegments(Collections.singleton(metadata.id));
diff --git a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
index 912b03f..fe8cdc2 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
@@ -127,7 +127,7 @@ public class LongCompactionsTest
         try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.COMPACTION))
         {
             assert txn != null : "Cannot markCompacting all sstables";
-            new CompactionTask(store, txn, gcBefore).execute(null);
+            new CompactionTask(store, txn, gcBefore).execute(ActiveCompactionsTracker.NOOP);
         }
         System.out.println(String.format("%s: sstables=%d rowsper=%d colsper=%d: %d ms",
                                          this.getClass().getName(),
diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
index 3bcd9d1..f8f94a0 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
@@ -97,7 +97,7 @@ public class LongLeveledCompactionStrategyTest
                 {
                     public void run()
                     {
-                        nextTask.execute(null);
+                        nextTask.execute(ActiveCompactionsTracker.NOOP);
                     }
                 });
             }
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index d413fd7..3054be6 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -40,6 +40,7 @@ import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.compaction.ActiveCompactionsTracker;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.ReplicaCollection;
 import org.apache.cassandra.schema.ColumnMetadata;
@@ -245,7 +246,7 @@ public class Util
         int gcBefore = cfs.gcBefore(FBUtilities.nowInSeconds());
         List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstables, gcBefore);
         for (AbstractCompactionTask task : tasks)
-            task.execute(null);
+            task.execute(ActiveCompactionsTracker.NOOP);
     }
 
     public static void expectEOF(Callable<?> callable)
diff --git a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
index 4d62894..431fb32 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
@@ -114,7 +114,7 @@ public class AbstractPendingRepairTest extends AbstractRepairTest
         return sstable;
     }
 
-    protected static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair, boolean isTransient)
+    public static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair, boolean isTransient)
     {
         try
         {
@@ -127,12 +127,12 @@ public class AbstractPendingRepairTest extends AbstractRepairTest
         }
     }
 
-    protected static void mutateRepaired(SSTableReader sstable, long repairedAt)
+    public static void mutateRepaired(SSTableReader sstable, long repairedAt)
     {
         mutateRepaired(sstable, repairedAt, ActiveRepairService.NO_PENDING_REPAIR, false);
     }
 
-    protected static void mutateRepaired(SSTableReader sstable, UUID pendingRepair, boolean isTransient)
+    public static void mutateRepaired(SSTableReader sstable, UUID pendingRepair, boolean isTransient)
     {
         mutateRepaired(sstable, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair, isTransient);
     }
diff --git a/test/unit/org/apache/cassandra/db/compaction/ActiveCompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/ActiveCompactionsTest.java
new file mode 100644
index 0000000..23e393d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/ActiveCompactionsTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import org.junit.Test;
+
+import org.apache.cassandra.cache.AutoSavingCache;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.view.View;
+import org.apache.cassandra.db.view.ViewBuilderTask;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.index.SecondaryIndexBuilder;
+import org.apache.cassandra.io.sstable.IndexSummaryRedistribution;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.CacheService;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class ActiveCompactionsTest extends CQLTester
+{
+    @Test
+    public void testSecondaryIndexTracking() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))");
+        String idxName = createIndex("CREATE INDEX on %s(a)");
+        getCurrentColumnFamilyStore().disableAutoCompaction();
+        for (int i = 0; i < 5; i++)
+        {
+            execute("INSERT INTO %s (pk, ck, a, b) VALUES ("+i+", 2, 3, 4)");
+            getCurrentColumnFamilyStore().forceBlockingFlush();
+        }
+
+        Index idx = getCurrentColumnFamilyStore().indexManager.getIndexByName(idxName);
+        Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables();
+        SecondaryIndexBuilder builder = idx.getBuildTaskSupport().getIndexBuildTask(getCurrentColumnFamilyStore(), Collections.singleton(idx), sstables);
+
+        MockActiveCompactions mockActiveCompactions = new MockActiveCompactions();
+        CompactionManager.instance.submitIndexBuild(builder, mockActiveCompactions).get();
+
+        assertTrue(mockActiveCompactions.finished);
+        assertNotNull(mockActiveCompactions.holder);
+        assertEquals(sstables, mockActiveCompactions.holder.getCompactionInfo().getSSTables());
+    }
+
+    @Test
+    public void testIndexSummaryRedistributionTracking() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))");
+        getCurrentColumnFamilyStore().disableAutoCompaction();
+        for (int i = 0; i < 5; i++)
+        {
+            execute("INSERT INTO %s (pk, ck, a, b) VALUES ("+i+", 2, 3, 4)");
+            getCurrentColumnFamilyStore().forceBlockingFlush();
+        }
+        Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables();
+        try (LifecycleTransaction txn = getCurrentColumnFamilyStore().getTracker().tryModify(sstables, OperationType.INDEX_SUMMARY))
+        {
+            Map<TableId, LifecycleTransaction> transactions = ImmutableMap.<TableId, LifecycleTransaction>builder().put(getCurrentColumnFamilyStore().metadata().id, txn).build();
+            IndexSummaryRedistribution isr = new IndexSummaryRedistribution(new ArrayList<>(sstables), transactions, 1000);
+            MockActiveCompactions mockActiveCompactions = new MockActiveCompactions();
+            CompactionManager.instance.runIndexSummaryRedistribution(isr, mockActiveCompactions);
+            assertTrue(mockActiveCompactions.finished);
+            assertNotNull(mockActiveCompactions.holder);
+            assertEquals(sstables, mockActiveCompactions.holder.getCompactionInfo().getSSTables());
+        }
+    }
+
+    @Test
+    public void testViewBuildTracking() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k1 int, c1 int , val int, PRIMARY KEY (k1, c1))");
+        getCurrentColumnFamilyStore().disableAutoCompaction();
+        for (int i = 0; i < 5; i++)
+        {
+            execute("INSERT INTO %s (k1, c1, val) VALUES ("+i+", 2, 3)");
+            getCurrentColumnFamilyStore().forceBlockingFlush();
+        }
+        execute(String.format("CREATE MATERIALIZED VIEW %s.view1 AS SELECT k1, c1, val FROM %s.%s WHERE k1 IS NOT NULL AND c1 IS NOT NULL AND val IS NOT NULL PRIMARY KEY (val, k1, c1)", keyspace(), keyspace(), currentTable()));
+        View view = Iterables.getOnlyElement(getCurrentColumnFamilyStore().viewManager);
+
+        Token token = DatabaseDescriptor.getPartitioner().getMinimumToken();
+        ViewBuilderTask vbt = new ViewBuilderTask(getCurrentColumnFamilyStore(), view, new Range<>(token, token), token, 0);
+
+        MockActiveCompactions mockActiveCompactions = new MockActiveCompactions();
+        CompactionManager.instance.submitViewBuilder(vbt, mockActiveCompactions).get();
+        assertTrue(mockActiveCompactions.finished);
+        assertTrue(mockActiveCompactions.holder.getCompactionInfo().getSSTables().isEmpty());
+        // this should stop for all compactions, even if it doesn't pick any sstables;
+        assertTrue(mockActiveCompactions.holder.getCompactionInfo().shouldStop((sstable) -> false));
+    }
+
+    @Test
+    public void testScrubOne() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))");
+        getCurrentColumnFamilyStore().disableAutoCompaction();
+        for (int i = 0; i < 5; i++)
+        {
+            execute("INSERT INTO %s (pk, ck, a, b) VALUES (" + i + ", 2, 3, 4)");
+            getCurrentColumnFamilyStore().forceBlockingFlush();
+        }
+
+        SSTableReader sstable = Iterables.getFirst(getCurrentColumnFamilyStore().getLiveSSTables(), null);
+        try (LifecycleTransaction txn = getCurrentColumnFamilyStore().getTracker().tryModify(sstable, OperationType.SCRUB))
+        {
+            MockActiveCompactions mockActiveCompactions = new MockActiveCompactions();
+            CompactionManager.instance.scrubOne(getCurrentColumnFamilyStore(), txn, true, false, false, mockActiveCompactions);
+
+            assertTrue(mockActiveCompactions.finished);
+            assertEquals(mockActiveCompactions.holder.getCompactionInfo().getSSTables(), Sets.newHashSet(sstable));
+            assertFalse(mockActiveCompactions.holder.getCompactionInfo().shouldStop((s) -> false));
+            assertTrue(mockActiveCompactions.holder.getCompactionInfo().shouldStop((s) -> true));
+        }
+
+    }
+
+    @Test
+    public void testVerifyOne() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, a int, b int, PRIMARY KEY (pk, ck))");
+        getCurrentColumnFamilyStore().disableAutoCompaction();
+        for (int i = 0; i < 5; i++)
+        {
+            execute("INSERT INTO %s (pk, ck, a, b) VALUES (" + i + ", 2, 3, 4)");
+            getCurrentColumnFamilyStore().forceBlockingFlush();
+        }
+
+        SSTableReader sstable = Iterables.getFirst(getCurrentColumnFamilyStore().getLiveSSTables(), null);
+        MockActiveCompactions mockActiveCompactions = new MockActiveCompactions();
+        CompactionManager.instance.verifyOne(getCurrentColumnFamilyStore(), sstable, new Verifier.Options.Builder().build(), mockActiveCompactions);
+        assertTrue(mockActiveCompactions.finished);
+        assertEquals(mockActiveCompactions.holder.getCompactionInfo().getSSTables(), Sets.newHashSet(sstable));
+        assertFalse(mockActiveCompactions.holder.getCompactionInfo().shouldStop((s) -> false));
+        assertTrue(mockActiveCompactions.holder.getCompactionInfo().shouldStop((s) -> true));
+    }
+
+    @Test
+    public void testSubmitCacheWrite() throws ExecutionException, InterruptedException
+    {
+        AutoSavingCache.Writer writer = CacheService.instance.keyCache.getWriter(100);
+        MockActiveCompactions mockActiveCompactions = new MockActiveCompactions();
+        CompactionManager.instance.submitCacheWrite(writer, mockActiveCompactions).get();
+        assertTrue(mockActiveCompactions.finished);
+        assertTrue(mockActiveCompactions.holder.getCompactionInfo().getSSTables().isEmpty());
+    }
+
+    private static class MockActiveCompactions implements ActiveCompactionsTracker
+    {
+        public CompactionInfo.Holder holder;
+        public boolean finished = false;
+        public void beginCompaction(CompactionInfo.Holder ci)
+        {
+            holder = ci;
+        }
+
+        public void finishCompaction(CompactionInfo.Holder ci)
+        {
+            finished = true;
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
new file mode 100644
index 0000000..4b05fc4
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CancelCompactionsTest.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.repair.PendingAntiCompaction;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.index.StubIndex;
+import org.apache.cassandra.index.internal.CollatedViewIndexBuilder;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.schema.MockSchema;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class CancelCompactionsTest
+{
+    @BeforeClass
+    public static void setup()
+    {
+        DatabaseDescriptor.daemonInitialization();
+    }
+
+    /**
+     * makes sure we only cancel compactions if the precidate says we have overlapping sstables
+     */
+    @Test
+    public void cancelTest() throws InterruptedException
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS();
+        List<SSTableReader> sstables = createSSTables(cfs, 10, 0);
+        Set<SSTableReader> toMarkCompacting = new HashSet<>(sstables.subList(0, 3));
+
+        TestCompactionTask tct = new TestCompactionTask(cfs, toMarkCompacting);
+        try
+        {
+            tct.start();
+
+            List<CompactionInfo.Holder> activeCompactions = CompactionManager.instance.active.getCompactions();
+            assertEquals(1, activeCompactions.size());
+            assertEquals(activeCompactions.get(0).getCompactionInfo().getSSTables(), toMarkCompacting);
+            // predicate requires the non-compacting sstables, should not cancel the one currently compacting:
+            cfs.runWithCompactionsDisabled(() -> null, (sstable) -> !toMarkCompacting.contains(sstable), false, false);
+            assertEquals(1, activeCompactions.size());
+            assertFalse(activeCompactions.get(0).isStopRequested());
+
+            // predicate requires the compacting ones - make sure stop is requested and that when we abort that
+            // compaction we actually run the callable (countdown the latch)
+            CountDownLatch cdl = new CountDownLatch(1);
+            Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { cdl.countDown(); return null; }, toMarkCompacting::contains, false, false));
+            t.start();
+            while (!activeCompactions.get(0).isStopRequested())
+                Thread.sleep(100);
+
+            // cdl.countDown will not get executed until we have aborted all compactions for the sstables in toMarkCompacting
+            assertFalse(cdl.await(2, TimeUnit.SECONDS));
+            tct.abort();
+            // now the compactions are aborted and we can successfully wait for the latch
+            t.join();
+            assertTrue(cdl.await(2, TimeUnit.SECONDS));
+        }
+        finally
+        {
+            tct.abort();
+        }
+    }
+
+    /**
+     * make sure we only cancel relevant compactions when there are multiple ongoing compactions
+     */
+    @Test
+    public void multipleCompactionsCancelTest() throws InterruptedException
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS();
+        List<SSTableReader> sstables = createSSTables(cfs, 10, 0);
+
+        List<TestCompactionTask> tcts = new ArrayList<>();
+        tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(0, 3))));
+        tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(6, 9))));
+
+        try
+        {
+            tcts.forEach(TestCompactionTask::start);
+
+            List<CompactionInfo.Holder> activeCompactions = CompactionManager.instance.active.getCompactions();
+            assertEquals(2, activeCompactions.size());
+
+            Set<Set<SSTableReader>> compactingSSTables = new HashSet<>();
+            compactingSSTables.add(activeCompactions.get(0).getCompactionInfo().getSSTables());
+            compactingSSTables.add(activeCompactions.get(1).getCompactionInfo().getSSTables());
+            Set<Set<SSTableReader>> expectedSSTables = new HashSet<>();
+            expectedSSTables.add(new HashSet<>(sstables.subList(0, 3)));
+            expectedSSTables.add(new HashSet<>(sstables.subList(6, 9)));
+            assertEquals(compactingSSTables, expectedSSTables);
+
+            cfs.runWithCompactionsDisabled(() -> null, (sstable) -> false, false, false);
+            assertEquals(2, activeCompactions.size());
+            assertTrue(activeCompactions.stream().noneMatch(CompactionInfo.Holder::isStopRequested));
+
+            CountDownLatch cdl = new CountDownLatch(1);
+            // start a compaction which only needs the sstables where first token is > 50 - these are the sstables compacted by tcts.get(1)
+            Thread t = new Thread(() -> cfs.runWithCompactionsDisabled(() -> { cdl.countDown(); return null; }, (sstable) -> first(sstable) > 50, false, false));
+            t.start();
+            activeCompactions = CompactionManager.instance.active.getCompactions();
+            assertEquals(2, activeCompactions.size());
+            Thread.sleep(500);
+            for (CompactionInfo.Holder holder : activeCompactions)
+            {
+                if (holder.getCompactionInfo().getSSTables().containsAll(sstables.subList(6, 9)))
+                    assertTrue(holder.isStopRequested());
+                else
+                    assertFalse(holder.isStopRequested());
+            }
+            tcts.get(1).abort();
+            assertEquals(1, CompactionManager.instance.active.getCompactions().size());
+            cdl.await();
+            t.join();
+        }
+        finally
+        {
+            tcts.forEach(TestCompactionTask::abort);
+        }
+    }
+
+    /**
+     * Makes sure sub range compaction now only cancels the relevant compactions, not all of them
+     */
+    @Test
+    public void testSubrangeCompaction() throws InterruptedException
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS();
+        List<SSTableReader> sstables = createSSTables(cfs, 10, 0);
+
+        List<TestCompactionTask> tcts = new ArrayList<>();
+        tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(0, 2))));
+        tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(3, 4))));
+        tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(5, 7))));
+        tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(8, 9))));
+        try
+        {
+            tcts.forEach(TestCompactionTask::start);
+
+            List<CompactionInfo.Holder> activeCompactions = CompactionManager.instance.active.getCompactions();
+            assertEquals(4, activeCompactions.size());
+            Range<Token> range = new Range<>(token(0), token(49));
+            Thread t = new Thread(() -> {
+                try
+                {
+                    cfs.forceCompactionForTokenRange(Collections.singleton(range));
+                }
+                catch (Throwable e)
+                {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            t.start();
+
+            Thread.sleep(500);
+            assertEquals(4, CompactionManager.instance.active.getCompactions().size());
+            List<TestCompactionTask> toAbort = new ArrayList<>();
+            for (CompactionInfo.Holder holder : CompactionManager.instance.active.getCompactions())
+            {
+                if (holder.getCompactionInfo().getSSTables().stream().anyMatch(sstable -> sstable.intersects(Collections.singleton(range))))
+                {
+                    assertTrue(holder.isStopRequested());
+                    for (TestCompactionTask tct : tcts)
+                        if (tct.sstables.equals(holder.getCompactionInfo().getSSTables()))
+                            toAbort.add(tct);
+                }
+                else
+                    assertFalse(holder.isStopRequested());
+            }
+            assertEquals(2, toAbort.size());
+            toAbort.forEach(TestCompactionTask::abort);
+            t.join();
+
+        }
+        finally
+        {
+            tcts.forEach(TestCompactionTask::abort);
+        }
+    }
+
+    @Test
+    public void testAnticompaction() throws InterruptedException, ExecutionException
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS();
+        List<SSTableReader> sstables = createSSTables(cfs, 10, 0);
+        List<SSTableReader> alreadyRepairedSSTables = createSSTables(cfs, 10, 10);
+        for (SSTableReader sstable : alreadyRepairedSSTables)
+            AbstractPendingRepairTest.mutateRepaired(sstable, System.currentTimeMillis());
+        assertEquals(20, cfs.getLiveSSTables().size());
+        List<TestCompactionTask> tcts = new ArrayList<>();
+        tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(0, 2))));
+        tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(3, 4))));
+        tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(5, 7))));
+        tcts.add(new TestCompactionTask(cfs, new HashSet<>(sstables.subList(8, 9))));
+
+        List<TestCompactionTask> nonAffectedTcts = new ArrayList<>();
+        nonAffectedTcts.add(new TestCompactionTask(cfs, new HashSet<>(alreadyRepairedSSTables)));
+
+        try
+        {
+            tcts.forEach(TestCompactionTask::start);
+            nonAffectedTcts.forEach(TestCompactionTask::start);
+            List<CompactionInfo.Holder> activeCompactions = CompactionManager.instance.active.getCompactions();
+            assertEquals(5, activeCompactions.size());
+            // make sure that sstables are fully contained so that the metadata gets mutated
+            Range<Token> range = new Range<>(token(-1), token(49));
+
+            UUID prsid = UUID.randomUUID();
+            ActiveRepairService.instance.registerParentRepairSession(prsid, InetAddressAndPort.getLocalHost(), Collections.singletonList(cfs), Collections.singleton(range), true, 1, true, PreviewKind.NONE);
+
+            InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
+            RangesAtEndpoint rae = RangesAtEndpoint.builder(local).add(new Replica(local, range, true)).build();
+
+            PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs), rae, Executors.newSingleThreadExecutor());
+            Future<?> fut = pac.run();
+            Thread.sleep(600);
+            List<TestCompactionTask> toAbort = new ArrayList<>();
+            for (CompactionInfo.Holder holder : CompactionManager.instance.active.getCompactions())
+            {
+                if (holder.getCompactionInfo().getSSTables().stream().anyMatch(sstable -> sstable.intersects(Collections.singleton(range)) && !sstable.isRepaired() && !sstable.isPendingRepair()))
+                {
+                    assertTrue(holder.isStopRequested());
+                    for (TestCompactionTask tct : tcts)
+                        if (tct.sstables.equals(holder.getCompactionInfo().getSSTables()))
+                            toAbort.add(tct);
+                }
+                else
+                    assertFalse(holder.isStopRequested());
+            }
+            assertEquals(2, toAbort.size());
+            toAbort.forEach(TestCompactionTask::abort);
+            fut.get();
+            for (SSTableReader sstable : sstables)
+                assertTrue(!sstable.intersects(Collections.singleton(range)) || sstable.isPendingRepair());
+        }
+        finally
+        {
+            tcts.forEach(TestCompactionTask::abort);
+            nonAffectedTcts.forEach(TestCompactionTask::abort);
+        }
+    }
+
+    /**
+     * Make sure index rebuilds get cancelled
+     */
+    @Test
+    public void testIndexRebuild() throws ExecutionException, InterruptedException
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS();
+        List<SSTableReader> sstables = createSSTables(cfs, 5, 0);
+        Index idx = new StubIndex(cfs, null);
+        CountDownLatch indexBuildStarted = new CountDownLatch(1);
+        CountDownLatch indexBuildRunning = new CountDownLatch(1);
+        CountDownLatch compactionsStopped = new CountDownLatch(1);
+        ReducingKeyIterator reducingKeyIterator = new ReducingKeyIterator(sstables)
+        {
+            @Override
+            public boolean hasNext()
+            {
+                indexBuildStarted.countDown();
+                try
+                {
+                    indexBuildRunning.await();
+                }
+                catch (InterruptedException e)
+                {
+                    throw new RuntimeException();
+                }
+                return false;
+            }
+        };
+        Future<?> f = CompactionManager.instance.submitIndexBuild(new CollatedViewIndexBuilder(cfs, Collections.singleton(idx), reducingKeyIterator, ImmutableSet.copyOf(sstables)));
+        // wait for hasNext to get called
+        indexBuildStarted.await();
+        assertEquals(1, CompactionManager.instance.active.getCompactions().size());
+        boolean foundCompaction = false;
+        for (CompactionInfo.Holder holder : CompactionManager.instance.active.getCompactions())
+        {
+            if (holder.getCompactionInfo().getSSTables().equals(new HashSet<>(sstables)))
+            {
+                assertFalse(holder.isStopRequested());
+                foundCompaction = true;
+            }
+        }
+        assertTrue(foundCompaction);
+        cfs.runWithCompactionsDisabled(() -> {compactionsStopped.countDown(); return null;}, (sstable) -> true, false, false);
+        // wait for the runWithCompactionsDisabled callable
+        compactionsStopped.await();
+        assertEquals(1, CompactionManager.instance.active.getCompactions().size());
+        foundCompaction = false;
+        for (CompactionInfo.Holder holder : CompactionManager.instance.active.getCompactions())
+        {
+            if (holder.getCompactionInfo().getSSTables().equals(new HashSet<>(sstables)))
+            {
+                assertTrue(holder.isStopRequested());
+                foundCompaction = true;
+            }
+        }
+        assertTrue(foundCompaction);
+        // signal that the index build should be finished
+        indexBuildRunning.countDown();
+        f.get();
+        assertTrue(CompactionManager.instance.active.getCompactions().isEmpty());
+    }
+
+    long first(SSTableReader sstable)
+    {
+        return (long)sstable.first.getToken().getTokenValue();
+    }
+
+    Token token(long t)
+    {
+        return new Murmur3Partitioner.LongToken(t);
+    }
+
+    private List<SSTableReader> createSSTables(ColumnFamilyStore cfs, int count, int startGeneration)
+    {
+        List<SSTableReader> sstables = new ArrayList<>();
+        for (int i = 0; i < count; i++)
+        {
+            long first = i * 10;
+            long last  = (i + 1) * 10 - 1;
+            sstables.add(MockSchema.sstable(startGeneration + i, 0, true, first, last, cfs));
+        }
+        cfs.disableAutoCompaction();
+        cfs.addSSTables(sstables);
+        return sstables;
+    }
+
+    private static class TestCompactionTask
+    {
+        private ColumnFamilyStore cfs;
+        private final Set<SSTableReader> sstables;
+        private LifecycleTransaction txn;
+        private CompactionController controller;
+        private CompactionIterator ci;
+        private List<ISSTableScanner> scanners;
+
+        public TestCompactionTask(ColumnFamilyStore cfs, Set<SSTableReader> sstables)
+        {
+            this.cfs = cfs;
+            this.sstables = sstables;
+        }
+
+        public void start()
+        {
+            scanners = sstables.stream().map(SSTableReader::getScanner).collect(Collectors.toList());
+            txn = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+            assertNotNull(txn);
+            controller = new CompactionController(cfs, sstables, Integer.MIN_VALUE);
+            ci = new CompactionIterator(txn.opType(), scanners, controller, FBUtilities.nowInSeconds(), UUID.randomUUID());
+            CompactionManager.instance.active.beginCompaction(ci);
+        }
+
+        public void abort()
+        {
+            if (controller != null)
+                controller.close();
+            if (ci != null)
+                ci.close();
+            if (txn != null)
+                txn.abort();
+            if (scanners != null)
+                scanners.forEach(ISSTableScanner::close);
+            CompactionManager.instance.active.finishCompaction(ci);
+
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
index 864ef3e..bb02dab 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.TableMetadata;
 
@@ -461,9 +462,9 @@ public class CompactionIteratorTest
         }
 
         @Override
-        public String getBackingFiles()
+        public Set<SSTableReader> getBackingSSTables()
         {
-            return null;
+            return ImmutableSet.of();
         }
     }
 }
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
index 267c2e4..9f2bc2e 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
@@ -252,7 +252,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
         Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass());
 
         // run the compaction
-        compactionTask.execute(null);
+        compactionTask.execute(ActiveCompactionsTracker.NOOP);
 
         Assert.assertTrue(repairedContains(sstable));
         Assert.assertFalse(unrepairedContains(sstable));
@@ -293,7 +293,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
         Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass());
 
         // run the compaction
-        compactionTask.execute(null);
+        compactionTask.execute(ActiveCompactionsTracker.NOOP);
 
         Assert.assertFalse(repairedContains(sstable));
         Assert.assertTrue(unrepairedContains(sstable));
@@ -330,7 +330,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
         Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass());
 
         // run the compaction
-        compactionTask.execute(null);
+        compactionTask.execute(ActiveCompactionsTracker.NOOP);
 
         Assert.assertTrue(cfs.getLiveSSTables().isEmpty());
         Assert.assertFalse(hasPendingStrategiesFor(repairID));
@@ -361,7 +361,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
         Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass());
 
         // run the compaction
-        compactionTask.execute(null);
+        compactionTask.execute(ActiveCompactionsTracker.NOOP);
 
         Assert.assertFalse(cfs.getLiveSSTables().isEmpty());
         Assert.assertFalse(hasPendingStrategiesFor(repairID));
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
index 5370f33..af74603 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
@@ -86,7 +86,7 @@ public class CompactionTaskTest
         cfs.getCompactionStrategyManager().pause();
         try
         {
-            task.execute(CompactionManager.instance.getMetrics());
+            task.execute(CompactionManager.instance.active);
             Assert.fail("Expected CompactionInterruptedException");
         }
         catch (CompactionInterruptedException e)
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index 857fa32..b003721 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -415,7 +415,7 @@ public class CompactionsCQLTest extends CQLTester
         AbstractCompactionTask act = lcs.getNextBackgroundTask(0);
         // we should be compacting all 50 sstables:
         assertEquals(50, act.transaction.originals().size());
-        act.execute(null);
+        act.execute(ActiveCompactionsTracker.NOOP);
     }
 
     @Test
@@ -452,7 +452,7 @@ public class CompactionsCQLTest extends CQLTester
         assertEquals(0, ((LeveledCompactionTask)act).getLevel());
         assertTrue(act.transaction.originals().stream().allMatch(s -> s.getSSTableLevel() == 0));
         txn.abort(); // unmark the l1 sstable compacting
-        act.execute(null);
+        act.execute(ActiveCompactionsTracker.NOOP);
     }
 
     private void prepareWide() throws Throwable
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 4573485..dcd5270 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -305,7 +305,7 @@ public class CompactionsPurgeTest
         cfs.forceBlockingFlush();
         List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstablesIncomplete, Integer.MAX_VALUE);
         assertEquals(1, tasks.size());
-        tasks.get(0).execute(null);
+        tasks.get(0).execute(ActiveCompactionsTracker.NOOP);
 
         // verify that minor compaction does GC when key is provably not
         // present in a non-compacted sstable
@@ -356,7 +356,7 @@ public class CompactionsPurgeTest
         // compact the sstables with the c1/c2 data and the c1 tombstone
         List<AbstractCompactionTask> tasks = cfs.getCompactionStrategyManager().getUserDefinedTasks(sstablesIncomplete, Integer.MAX_VALUE);
         assertEquals(1, tasks.size());
-        tasks.get(0).execute(null);
+        tasks.get(0).execute(ActiveCompactionsTracker.NOOP);
 
         // We should have both the c1 and c2 tombstones still. Since the min timestamp in the c2 tombstone
         // sstable is older than the c1 tombstone, it is invalid to throw out the c1 tombstone.
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index b1aaf8a..6c75e7b 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -340,7 +340,7 @@ public class LeveledCompactionStrategyTest
         waitForLeveling(cfs);
         cfs.disableAutoCompaction();
 
-        while(CompactionManager.instance.isCompacting(Arrays.asList(cfs)))
+        while(CompactionManager.instance.isCompacting(Arrays.asList(cfs), (sstable) -> true))
             Thread.sleep(100);
 
         CompactionStrategyManager manager = cfs.getCompactionStrategyManager();
@@ -437,7 +437,7 @@ public class LeveledCompactionStrategyTest
         Collection<Range<Token>> tokenRanges = new ArrayList<>(Arrays.asList(tokenRange));
         cfs.forceCompactionForTokenRange(tokenRanges);
 
-        while(CompactionManager.instance.isCompacting(Arrays.asList(cfs))) {
+        while(CompactionManager.instance.isCompacting(Arrays.asList(cfs), (sstable) -> true)) {
             Thread.sleep(100);
         }
 
@@ -450,7 +450,7 @@ public class LeveledCompactionStrategyTest
         cfs.forceCompactionForTokenRange(tokenRanges2);
 
 
-        while(CompactionManager.instance.isCompacting(Arrays.asList(cfs))) {
+        while(CompactionManager.instance.isCompacting(Arrays.asList(cfs), (sstable) -> true)) {
             Thread.sleep(100);
         }
 
diff --git a/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java b/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java
index 1292b7e..61cf302 100644
--- a/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java
@@ -100,7 +100,7 @@ public class SingleSSTableLCSTaskTest extends CQLTester
         // now we have a bunch of data in L0, first compaction will be a normal one, containing all sstables:
         LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepairedUnsafe().first();
         AbstractCompactionTask act = lcs.getNextBackgroundTask(0);
-        act.execute(null);
+        act.execute(ActiveCompactionsTracker.NOOP);
 
         // now all sstables are laid out non-overlapping in L1, this means that the rest of the compactions
         // will be single sstable ones, make sure that we use SingleSSTableLCSTask if singleSSTUplevel is true:
@@ -108,7 +108,7 @@ public class SingleSSTableLCSTaskTest extends CQLTester
         {
             act = lcs.getNextBackgroundTask(0);
             assertEquals(singleSSTUplevel, act instanceof SingleSSTableLCSTask);
-            act.execute(null);
+            act.execute(ActiveCompactionsTracker.NOOP);
         }
         assertEquals(0, lcs.getLevelSize(0));
         int l1size = lcs.getLevelSize(1);
diff --git a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
index 12a429b..385deac 100644
--- a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
@@ -50,13 +50,16 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.AbstractPendingRepairTest;
 import org.apache.cassandra.db.compaction.CompactionController;
+import org.apache.cassandra.db.compaction.CompactionInfo;
 import org.apache.cassandra.db.compaction.CompactionInterruptedException;
 import org.apache.cassandra.db.compaction.CompactionIterator;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
@@ -65,6 +68,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.RangesAtEndpoint;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
+import org.apache.cassandra.schema.MockSchema;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -74,6 +78,11 @@ import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.apache.cassandra.utils.concurrent.Transactional;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
 {
     static final Logger logger = LoggerFactory.getLogger(PendingAntiCompactionTest.class);
@@ -115,7 +124,7 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
             QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (?, ?)", ks, tbl), i, i);
         }
         cfs.forceBlockingFlush();
-        Assert.assertEquals(2, cfs.getLiveSSTables().size());
+        assertEquals(2, cfs.getLiveSSTables().size());
 
         Token left = ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes((int) 6));
         Token right = ByteOrderedPartitioner.instance.getToken(ByteBufferUtil.bytes((int) 16));
@@ -138,14 +147,14 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
             executor.shutdown();
         }
 
-        Assert.assertEquals(3, cfs.getLiveSSTables().size());
+        assertEquals(3, cfs.getLiveSSTables().size());
         int pendingRepair = 0;
         for (SSTableReader sstable : cfs.getLiveSSTables())
         {
             if (sstable.isPendingRepair())
                 pendingRepair++;
         }
-        Assert.assertEquals(2, pendingRepair);
+        assertEquals(2, pendingRepair);
     }
 
     @Test
@@ -168,14 +177,14 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
         PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
         Assert.assertNotNull(result);
         logger.info("Originals: {}", result.txn.originals());
-        Assert.assertEquals(3, result.txn.originals().size());
+        assertEquals(3, result.txn.originals().size());
         for (SSTableReader sstable : expected)
         {
             logger.info("Checking {}", sstable);
-            Assert.assertTrue(result.txn.originals().contains(sstable));
+            assertTrue(result.txn.originals().contains(sstable));
         }
 
-        Assert.assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, result.txn.state());
+        assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, result.txn.state());
         result.abort();
     }
 
@@ -186,11 +195,11 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
         makeSSTables(2);
 
         List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
-        Assert.assertEquals(2, sstables.size());
+        assertEquals(2, sstables.size());
         SSTableReader repaired = sstables.get(0);
         SSTableReader unrepaired = sstables.get(1);
-        Assert.assertTrue(repaired.intersects(FULL_RANGE));
-        Assert.assertTrue(unrepaired.intersects(FULL_RANGE));
+        assertTrue(repaired.intersects(FULL_RANGE));
+        assertTrue(unrepaired.intersects(FULL_RANGE));
 
         repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor, 1, null, false);
         repaired.reloadSSTableMetadata();
@@ -200,8 +209,8 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
         Assert.assertNotNull(result);
 
         logger.info("Originals: {}", result.txn.originals());
-        Assert.assertEquals(1, result.txn.originals().size());
-        Assert.assertTrue(result.txn.originals().contains(unrepaired));
+        assertEquals(1, result.txn.originals().size());
+        assertTrue(result.txn.originals().contains(unrepaired));
         result.abort(); // release sstable refs
     }
 
@@ -212,25 +221,25 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
         makeSSTables(2);
 
         List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
-        Assert.assertEquals(2, sstables.size());
+        assertEquals(2, sstables.size());
         SSTableReader repaired = sstables.get(0);
         SSTableReader unrepaired = sstables.get(1);
-        Assert.assertTrue(repaired.intersects(FULL_RANGE));
-        Assert.assertTrue(unrepaired.intersects(FULL_RANGE));
+        assertTrue(repaired.intersects(FULL_RANGE));
+        assertTrue(unrepaired.intersects(FULL_RANGE));
 
         UUID sessionId = prepareSession();
         LocalSessionAccessor.finalizeUnsafe(sessionId);
         repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor, 0, sessionId, false);
         repaired.reloadSSTableMetadata();
-        Assert.assertTrue(repaired.isPendingRepair());
+        assertTrue(repaired.isPendingRepair());
 
         PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
         PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
         Assert.assertNotNull(result);
 
         logger.info("Originals: {}", result.txn.originals());
-        Assert.assertEquals(1, result.txn.originals().size());
-        Assert.assertTrue(result.txn.originals().contains(unrepaired));
+        assertEquals(1, result.txn.originals().size());
+        assertTrue(result.txn.originals().contains(unrepaired));
         result.abort();  // releases sstable refs
     }
 
@@ -241,16 +250,16 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
         makeSSTables(2);
 
         List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
-        Assert.assertEquals(2, sstables.size());
+        assertEquals(2, sstables.size());
         SSTableReader repaired = sstables.get(0);
         SSTableReader unrepaired = sstables.get(1);
-        Assert.assertTrue(repaired.intersects(FULL_RANGE));
-        Assert.assertTrue(unrepaired.intersects(FULL_RANGE));
+        assertTrue(repaired.intersects(FULL_RANGE));
+        assertTrue(unrepaired.intersects(FULL_RANGE));
 
         UUID sessionId = prepareSession();
         repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor, 0, sessionId, false);
         repaired.reloadSSTableMetadata();
-        Assert.assertTrue(repaired.isPendingRepair());
+        assertTrue(repaired.isPendingRepair());
 
         PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
         PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
@@ -262,7 +271,7 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
     {
         cfs.disableAutoCompaction();
 
-        Assert.assertEquals(0, cfs.getLiveSSTables().size());
+        assertEquals(0, cfs.getLiveSSTables().size());
 
         PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
         PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
@@ -285,11 +294,11 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
         Assert.assertNotNull(result);
 
         InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, NO_RANGES));
-        Assert.assertTrue(cb.submittedCompactions.isEmpty());
+        assertTrue(cb.submittedCompactions.isEmpty());
         cb.apply(Lists.newArrayList(result));
 
-        Assert.assertEquals(1, cb.submittedCompactions.size());
-        Assert.assertTrue(cb.submittedCompactions.contains(cfm.id));
+        assertEquals(1, cb.submittedCompactions.size());
+        assertTrue(cb.submittedCompactions.contains(cfm.id));
     }
 
     /**
@@ -306,14 +315,14 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
         PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
         PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
         Assert.assertNotNull(result);
-        Assert.assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, result.txn.state());
+        assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, result.txn.state());
 
         InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, Collections.emptyList()));
-        Assert.assertTrue(cb.submittedCompactions.isEmpty());
+        assertTrue(cb.submittedCompactions.isEmpty());
         cb.apply(Lists.newArrayList(result, null));
 
-        Assert.assertTrue(cb.submittedCompactions.isEmpty());
-        Assert.assertEquals(Transactional.AbstractTransactional.State.ABORTED, result.txn.state());
+        assertTrue(cb.submittedCompactions.isEmpty());
+        assertEquals(Transactional.AbstractTransactional.State.ABORTED, result.txn.state());
     }
 
     /**
@@ -334,12 +343,12 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
         PendingAntiCompaction.AcquireResult fakeResult = new PendingAntiCompaction.AcquireResult(cfs2, null, null);
 
         InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, NO_RANGES));
-        Assert.assertTrue(cb.submittedCompactions.isEmpty());
+        assertTrue(cb.submittedCompactions.isEmpty());
         cb.apply(Lists.newArrayList(result, fakeResult));
 
-        Assert.assertEquals(1, cb.submittedCompactions.size());
-        Assert.assertTrue(cb.submittedCompactions.contains(cfm.id));
-        Assert.assertFalse(cb.submittedCompactions.contains(cfs2.metadata.id));
+        assertEquals(1, cb.submittedCompactions.size());
+        assertTrue(cb.submittedCompactions.contains(cfm.id));
+        assertFalse(cb.submittedCompactions.contains(cfs2.metadata.id));
     }
 
 
@@ -398,7 +407,7 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
         try
         {
             fut.get();
-            Assert.fail("Should throw exception");
+            fail("Should throw exception");
         }
         catch(Throwable t)
         {
@@ -406,7 +415,7 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
     }
 
     @Test
-    public void testBlockedAcquisition() throws ExecutionException, InterruptedException
+    public void testBlockedAcquisition() throws ExecutionException, InterruptedException, TimeoutException
     {
         cfs.disableAutoCompaction();
         ExecutorService es = Executors.newFixedThreadPool(1);
@@ -419,33 +428,27 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
         {
             try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
                  CompactionController controller = new CompactionController(cfs, sstables, 0);
-                 CompactionIterator ci = CompactionManager.getAntiCompactionIterator(scanners, controller, 0, UUID.randomUUID(), CompactionManager.instance.getMetrics()))
+                 CompactionIterator ci = CompactionManager.getAntiCompactionIterator(scanners, controller, 0, UUID.randomUUID(), CompactionManager.instance.active))
             {
                 // `ci` is our imaginary ongoing anticompaction which makes no progress until after 30s
                 // now we try to start a new AC, which will try to cancel all ongoing compactions
 
-                CompactionManager.instance.getMetrics().beginCompaction(ci);
+                CompactionManager.instance.active.beginCompaction(ci);
                 PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), es);
                 ListenableFuture fut = pac.run();
                 try
                 {
                     fut.get(30, TimeUnit.SECONDS);
+                    fail("the future should throw exception since we try to start a new anticompaction when one is already running");
                 }
-                catch (TimeoutException e)
-                {
-                    // expected, we wait 1 minute for compactions to get cancelled in runWithCompactionsDisabled
-                }
-                Assert.assertTrue(ci.hasNext());
-                ci.next(); // this would throw exception if the CompactionIterator was abortable
-                try
+                catch (ExecutionException e)
                 {
-                    fut.get();
-                    Assert.fail("We should get exception when trying to start a new anticompaction with the same sstables");
+                    assertTrue(e.getCause() instanceof PendingAntiCompaction.SSTableAcquisitionException);
                 }
-                catch (Throwable t)
-                {
 
-                }
+                assertEquals(1, getCompactionsFor(cfs).size());
+                for (CompactionInfo.Holder holder : getCompactionsFor(cfs))
+                    assertFalse(holder.isStopRequested());
             }
         }
         finally
@@ -455,6 +458,17 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
         }
     }
 
+    private List<CompactionInfo.Holder> getCompactionsFor(ColumnFamilyStore cfs)
+    {
+        List<CompactionInfo.Holder> compactions = new ArrayList<>();
+        for (CompactionInfo.Holder holder : CompactionManager.instance.active.getCompactions())
+        {
+            if (holder.getCompactionInfo().getTableMetadata().equals(cfs.metadata()))
+                compactions.add(holder);
+        }
+        return compactions;
+    }
+
     @Test
     public void testUnblockedAcquisition() throws ExecutionException, InterruptedException
     {
@@ -473,7 +487,7 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
                 // `ci` is our imaginary ongoing anticompaction which makes no progress until after 5s
                 // now we try to start a new AC, which will try to cancel all ongoing compactions
 
-                CompactionManager.instance.getMetrics().beginCompaction(ci);
+                CompactionManager.instance.active.beginCompaction(ci);
                 PendingAntiCompaction pac = new PendingAntiCompaction(prsid, Collections.singleton(cfs), atEndpoint(FULL_RANGE, NO_RANGES), es);
                 ListenableFuture fut = pac.run();
                 try
@@ -487,13 +501,13 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
                 }
                 try
                 {
-                    Assert.assertTrue(ci.hasNext());
+                    assertTrue(ci.hasNext());
                     ci.next();
-                    Assert.fail("CompactionIterator should be abortable");
+                    fail("CompactionIterator should be abortable");
                 }
                 catch (CompactionInterruptedException e)
                 {
-                    CompactionManager.instance.getMetrics().finishCompaction(ci);
+                    CompactionManager.instance.active.finishCompaction(ci);
                     txn.abort();
                     // expected
                 }
@@ -509,7 +523,7 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
                     {
                     }
                 });
-                Assert.assertTrue(cdl.await(1, TimeUnit.MINUTES));
+                assertTrue(cdl.await(1, TimeUnit.MINUTES));
             }
         }
         finally
@@ -518,6 +532,73 @@ public class PendingAntiCompactionTest extends AbstractPendingAntiCompactionTest
         }
     }
 
+    @Test
+    public void testSSTablePredicateOngoingAntiCompaction()
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS();
+        cfs.disableAutoCompaction();
+        List<SSTableReader> sstables = new ArrayList<>();
+        List<SSTableReader> repairedSSTables = new ArrayList<>();
+        List<SSTableReader> pendingSSTables = new ArrayList<>();
+        for (int i = 1; i <= 10; i++)
+        {
+            SSTableReader sstable = MockSchema.sstable(i, i * 10, i * 10 + 9, cfs);
+            sstables.add(sstable);
+        }
+        for (int i = 1; i <= 10; i++)
+        {
+            SSTableReader sstable = MockSchema.sstable(i + 10, i * 10, i * 10 + 9, cfs);
+            AbstractPendingRepairTest.mutateRepaired(sstable, System.currentTimeMillis());
+            repairedSSTables.add(sstable);
+        }
+        for (int i = 1; i <= 10; i++)
+        {
+            SSTableReader sstable = MockSchema.sstable(i + 20, i * 10, i * 10 + 9, cfs);
+            AbstractPendingRepairTest.mutateRepaired(sstable, UUID.randomUUID(), false);
+            pendingSSTables.add(sstable);
+        }
+
+        cfs.addSSTables(sstables);
+        cfs.addSSTables(repairedSSTables);
+
+        // if we are compacting the non-repaired non-pending sstables, we should get an error
+        tryPredicate(cfs, sstables, null, true);
+        // make sure we don't try to grab pending or repaired sstables;
+        tryPredicate(cfs, repairedSSTables, sstables, false);
+        tryPredicate(cfs, pendingSSTables, sstables, false);
+    }
+
+    private void tryPredicate(ColumnFamilyStore cfs, List<SSTableReader> compacting, List<SSTableReader> expectedLive, boolean shouldFail)
+    {
+        CompactionInfo.Holder holder = new CompactionInfo.Holder()
+        {
+            public CompactionInfo getCompactionInfo()
+            {
+                return new CompactionInfo(cfs.metadata(), OperationType.ANTICOMPACTION, 0, 1000, UUID.randomUUID(), compacting);
+            }
+        };
+        CompactionManager.instance.active.beginCompaction(holder);
+        try
+        {
+            PendingAntiCompaction.AntiCompactionPredicate predicate =
+            new PendingAntiCompaction.AntiCompactionPredicate(Collections.singleton(new Range<>(new Murmur3Partitioner.LongToken(0), new Murmur3Partitioner.LongToken(100))),
+                                                              UUID.randomUUID());
+            Set<SSTableReader> live = cfs.getLiveSSTables().stream().filter(predicate).collect(Collectors.toSet());
+            if (shouldFail)
+                fail("should fail - we try to grab already anticompacting sstables for anticompaction");
+            assertEquals(live, new HashSet<>(expectedLive));
+        }
+        catch (PendingAntiCompaction.SSTableAcquisitionException e)
+        {
+            if (!shouldFail)
+                fail("We should not fail filtering sstables");
+        }
+        finally
+        {
+            CompactionManager.instance.active.finishCompaction(holder);
+        }
+    }
+
     private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, Collection<Range<Token>> trans)
     {
         RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(local);
diff --git a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
index 9a806c2..1c07760 100644
--- a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
+++ b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
@@ -28,6 +28,8 @@ import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
+import com.google.common.collect.ImmutableSet;
+
 import org.apache.cassandra.index.TargetParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -593,8 +595,8 @@ public class CustomCassandraIndex implements Index
     {
         // interrupt in-progress compactions
         Collection<ColumnFamilyStore> cfss = Collections.singleton(indexCfs);
-        CompactionManager.instance.interruptCompactionForCFs(cfss, true);
-        CompactionManager.instance.waitForCessation(cfss);
+        CompactionManager.instance.interruptCompactionForCFs(cfss, (sstable) -> true, true);
+        CompactionManager.instance.waitForCessation(cfss, (sstable) -> true);
         indexCfs.keyspace.writeOrder.awaitNewBarrier();
         indexCfs.forceBlockingFlush();
         indexCfs.readOrdering.awaitNewBarrier();
@@ -641,7 +643,8 @@ public class CustomCassandraIndex implements Index
 
             SecondaryIndexBuilder builder = new CollatedViewIndexBuilder(baseCfs,
                                                                          Collections.singleton(this),
-                                                                         new ReducingKeyIterator(sstables));
+                                                                         new ReducingKeyIterator(sstables),
+                                                                         ImmutableSet.copyOf(sstables));
             Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
             FBUtilities.waitOnFuture(future);
             indexCfs.forceBlockingFlush();
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 186f0e8..ab8a710 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -111,7 +111,7 @@ public class IndexSummaryManagerTest
     @After
     public void afterTest()
     {
-        for (CompactionInfo.Holder holder : CompactionMetrics.getCompactions())
+        for (CompactionInfo.Holder holder : CompactionManager.instance.active.getCompactions())
         {
             holder.stop();
         }
@@ -659,7 +659,7 @@ public class IndexSummaryManagerTest
         t.join();
 
         assertNotNull("Expected compaction interrupted exception", exception.get());
-        assertTrue("Expected no active compactions", CompactionMetrics.getCompactions().isEmpty());
+        assertTrue("Expected no active compactions", CompactionManager.instance.active.getCompactions().isEmpty());
 
         Set<SSTableReader> beforeRedistributionSSTables = new HashSet<>(sstables);
         Set<SSTableReader> afterCancelSSTables = new HashSet<>(cfs.getLiveSSTables());
diff --git a/test/unit/org/apache/cassandra/schema/MockSchema.java b/test/unit/org/apache/cassandra/schema/MockSchema.java
index 7a6b011..9ca2d6e 100644
--- a/test/unit/org/apache/cassandra/schema/MockSchema.java
+++ b/test/unit/org/apache/cassandra/schema/MockSchema.java
@@ -49,6 +49,8 @@ import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.utils.AlwaysPresentFilter;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
+
 public class MockSchema
 {
     static
@@ -127,7 +129,7 @@ public class MockSchema
         }
         SerializationHeader header = SerializationHeader.make(cfs.metadata(), Collections.emptyList());
         StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata().comparator)
-                                                 .finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 0.01f, -1, null, false, header)
+                                                 .finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 0.01f, UNREPAIRED_SSTABLE, null, false, header)
                                                  .get(MetadataType.STATS);
         SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata,
                                                           RANDOM_ACCESS_READER_FACTORY.sharedCopy(), RANDOM_ACCESS_READER_FACTORY.sharedCopy(), indexSummary.sharedCopy(),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org