You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2017/02/07 03:11:48 UTC

[7/7] cassandra git commit: Fix consistency of incrementally repaired data

Fix consistency of incrementally repaired data

patch by Blake Eggleston, reviewed by Marcus Eriksson for CASSANDRA-9143


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/98d74ed9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/98d74ed9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/98d74ed9

Branch: refs/heads/trunk
Commit: 98d74ed998706e9e047dc0f7886a1e9b18df3ce9
Parents: 1757e13
Author: Blake Eggleston <bd...@gmail.com>
Authored: Wed Aug 31 15:48:43 2016 -0700
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Feb 6 19:11:18 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   7 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  10 +-
 src/java/org/apache/cassandra/db/Memtable.java  |   1 +
 .../org/apache/cassandra/db/SystemKeyspace.java |  19 +-
 .../compaction/AbstractCompactionStrategy.java  |  10 +-
 .../db/compaction/CompactionManager.java        |  96 +-
 .../compaction/CompactionStrategyManager.java   | 258 +++++-
 .../cassandra/db/compaction/CompactionTask.java |  17 +
 .../DateTieredCompactionStrategy.java           |   6 +
 .../compaction/LeveledCompactionStrategy.java   |   6 +
 .../db/compaction/LeveledManifest.java          |  11 +
 .../db/compaction/PendingRepairManager.java     | 432 +++++++++
 .../cassandra/db/compaction/Scrubber.java       |   8 +-
 .../SizeTieredCompactionStrategy.java           |   7 +
 .../TimeWindowCompactionStrategy.java           |   6 +
 .../cassandra/db/compaction/Upgrader.java       |   7 +-
 .../cassandra/db/compaction/Verifier.java       |   2 +-
 .../writers/CompactionAwareWriter.java          |   3 +
 .../writers/DefaultCompactionWriter.java        |   1 +
 .../writers/MajorLeveledCompactionWriter.java   |   2 +-
 .../writers/MaxSSTableSizeWriter.java           |   1 +
 .../SplittingSizeTieredCompactionWriter.java    |   1 +
 .../org/apache/cassandra/dht/RangeStreamer.java |   2 +-
 .../io/sstable/AbstractSSTableSimpleWriter.java |   3 +-
 .../cassandra/io/sstable/SSTableLoader.java     |   2 +-
 .../cassandra/io/sstable/SSTableTxnWriter.java  |  15 +-
 .../io/sstable/SimpleSSTableMultiWriter.java    |   4 +-
 .../sstable/format/RangeAwareSSTableWriter.java |   9 +-
 .../io/sstable/format/SSTableReader.java        |  11 +
 .../io/sstable/format/SSTableWriter.java        |  17 +-
 .../cassandra/io/sstable/format/Version.java    |   2 +
 .../io/sstable/format/big/BigFormat.java        |  14 +-
 .../io/sstable/format/big/BigTableWriter.java   |  11 +-
 .../sstable/metadata/IMetadataSerializer.java   |   5 +-
 .../io/sstable/metadata/MetadataCollector.java  |   9 +-
 .../io/sstable/metadata/MetadataSerializer.java |   9 +-
 .../io/sstable/metadata/StatsMetadata.java      |  67 +-
 .../net/IncomingStreamingConnection.java        |   2 +-
 .../cassandra/repair/AnticompactionTask.java    | 174 ----
 .../apache/cassandra/repair/LocalSyncTask.java  |   7 +-
 .../org/apache/cassandra/repair/RepairJob.java  |  30 +-
 .../repair/RepairMessageVerbHandler.java        |  61 +-
 .../apache/cassandra/repair/RepairRunnable.java | 306 ++++---
 .../apache/cassandra/repair/RepairSession.java  |   5 +-
 .../cassandra/repair/StreamingRepairTask.java   |   6 +-
 .../repair/SystemDistributedKeyspace.java       |   3 +-
 .../org/apache/cassandra/repair/Validator.java  |  11 +-
 .../repair/consistent/ConsistentSession.java    | 325 +++++++
 .../repair/consistent/CoordinatorSession.java   | 312 +++++++
 .../repair/consistent/CoordinatorSessions.java  |  95 ++
 .../repair/consistent/LocalSession.java         | 129 +++
 .../repair/consistent/LocalSessionInfo.java     |  67 ++
 .../repair/consistent/LocalSessions.java        | 703 +++++++++++++++
 .../consistent/PendingAntiCompaction.java       | 195 ++++
 .../repair/messages/AnticompactionRequest.java  | 107 ---
 .../cassandra/repair/messages/FailSession.java  |  71 ++
 .../repair/messages/FinalizeCommit.java         |  78 ++
 .../repair/messages/FinalizePromise.java        |  95 ++
 .../repair/messages/FinalizePropose.java        |  78 ++
 .../messages/PrepareConsistentRequest.java      | 124 +++
 .../messages/PrepareConsistentResponse.java     |  94 ++
 .../repair/messages/RepairMessage.java          |  14 +-
 .../cassandra/repair/messages/RepairOption.java |   5 +
 .../repair/messages/StatusRequest.java          |  77 ++
 .../repair/messages/StatusResponse.java         |  90 ++
 .../cassandra/service/ActiveRepairService.java  | 329 ++-----
 .../service/ActiveRepairServiceMBean.java       |  30 +
 .../cassandra/service/CassandraDaemon.java      |   1 +
 .../cassandra/streaming/ConnectionHandler.java  |   3 +-
 .../cassandra/streaming/StreamCoordinator.java  |   8 +-
 .../apache/cassandra/streaming/StreamPlan.java  |   8 +-
 .../cassandra/streaming/StreamReader.java       |  11 +-
 .../cassandra/streaming/StreamResultFuture.java |  10 +-
 .../cassandra/streaming/StreamSession.java      |   8 +-
 .../compress/CompressedStreamReader.java        |   4 +-
 .../streaming/messages/StreamInitMessage.java   |  19 +-
 .../org/apache/cassandra/tools/NodeProbe.java   |   9 +
 .../org/apache/cassandra/tools/NodeTool.java    |   1 +
 .../cassandra/tools/SSTableMetadataViewer.java  |   1 +
 .../tools/SSTableRepairedAtSetter.java          |   5 +-
 .../cassandra/tools/nodetool/RepairAdmin.java   | 147 +++
 .../db/RepairedDataTombstonesTest.java          |   2 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |   6 +-
 .../compaction/AbstractPendingRepairTest.java   | 139 +++
 .../db/compaction/AntiCompactionTest.java       |  63 +-
 ...tionManagerGetSSTablesForValidationTest.java | 177 ++++
 ...pactionStrategyManagerPendingRepairTest.java | 291 ++++++
 .../LeveledCompactionStrategyTest.java          |   4 +-
 .../db/compaction/PendingRepairManagerTest.java | 213 +++++
 .../db/lifecycle/LogTransactionTest.java        |   2 +-
 .../db/lifecycle/RealTransactionsTest.java      |   1 +
 .../cassandra/dht/StreamStateStoreTest.java     |   4 +-
 .../io/sstable/BigTableWriterTest.java          |   2 +-
 .../io/sstable/SSTableRewriterTest.java         |   2 +-
 .../cassandra/io/sstable/SSTableUtils.java      |   5 +-
 .../io/sstable/SSTableWriterTestBase.java       |   2 +-
 .../format/SSTableFlushObserverTest.java        |   2 +-
 .../metadata/MetadataSerializerTest.java        |  19 +-
 .../cassandra/repair/LocalSyncTaskTest.java     |   8 +-
 .../cassandra/repair/RepairSessionTest.java     |   2 +-
 .../apache/cassandra/repair/ValidatorTest.java  |   2 +-
 .../AbstractConsistentSessionTest.java          |  91 ++
 .../consistent/CoordinatorSessionTest.java      | 498 +++++++++++
 .../consistent/CoordinatorSessionsTest.java     | 208 +++++
 .../repair/consistent/LocalSessionAccessor.java |  62 ++
 .../repair/consistent/LocalSessionTest.java     | 885 +++++++++++++++++++
 .../consistent/PendingAntiCompactionTest.java   | 327 +++++++
 .../RepairMessageSerializationsTest.java        |   7 -
 .../messages/RepairMessageSerializerTest.java   | 115 +++
 .../repair/messages/RepairOptionTest.java       |  16 +-
 .../org/apache/cassandra/schema/MockSchema.java |   2 +-
 .../service/ActiveRepairServiceTest.java        | 112 +--
 .../streaming/StreamTransferTaskTest.java       |   6 +-
 114 files changed, 7235 insertions(+), 990 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b339e6e..71513b0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Fix consistency of incrementally repaired data (CASSANDRA-9143)
  * Increase commitlog version (CASSANDRA-13161)
  * Make TableMetadata immutable, optimize Schema (CASSANDRA-9425)
  * Refactor ColumnCondition (CASSANDRA-12981)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 1c4bfdc..ee1fd6d 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -34,7 +34,12 @@ Upgrading
       add new nodes to a datacenter before they can set set ALTER or CREATE
       keyspace replication policies using that datacenter. Existing keyspaces
       will continue to operate, but CREATE and ALTER will validate that all
-      datacenters specified exist in the cluster. 
+      datacenters specified exist in the cluster.
+    - Cassandra 4.0 fixes a problem with incremental repair which caused repaired
+      data to be inconsistent between nodes. The fix changes the behavior of both
+      full and incremental repairs. For full repairs, data is no longer marked
+      repaired. For incremental repairs, anticompaction is run at the beginning
+      of the repair, instead of at the end.
 
 3.10
 ====

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 039dc33..dceb41d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -520,15 +520,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return directories;
     }
 
-    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
     {
         MetadataCollector collector = new MetadataCollector(metadata().comparator).sstableLevel(sstableLevel);
-        return createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
+        return createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, collector, header, txn);
     }
 
-    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn)
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn)
     {
-        return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, indexManager.listIndexes(), txn);
+        return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, metadataCollector, header, indexManager.listIndexes(), txn);
     }
 
     public boolean supportsEarlyOpen()
@@ -1888,7 +1888,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    public Refs<SSTableReader> getSnapshotSSTableReader(String tag) throws IOException
+    public Refs<SSTableReader> getSnapshotSSTableReaders(String tag) throws IOException
     {
         Map<Integer, SSTableReader> active = new HashMap<>();
         for (SSTableReader sstable : getSSTables(SSTableSet.CANONICAL))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 01e741d..c0a1701 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -508,6 +508,7 @@ public class Memtable implements Comparable<Memtable>
             return cfs.createSSTableMultiWriter(descriptor,
                                                 toFlush.size(),
                                                 ActiveRepairService.UNREPAIRED_SSTABLE,
+                                                ActiveRepairService.NO_PENDING_REPAIR,
                                                 sstableMetadataCollector,
                                                 new SerializationHeader(true, cfs.metadata(), columns, stats), txn);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index e826dd8..058b378 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -101,6 +101,7 @@ public final class SystemKeyspace
     public static final String VIEWS_BUILDS_IN_PROGRESS = "views_builds_in_progress";
     public static final String BUILT_VIEWS = "built_views";
     public static final String PREPARED_STATEMENTS = "prepared_statements";
+    public static final String REPAIRS = "repairs";
 
     public static final TableMetadata Batches =
         parse(BATCHES,
@@ -291,6 +292,21 @@ public final class SystemKeyspace
               + "PRIMARY KEY ((prepared_id)))")
               .build();
 
+    private static final TableMetadata Repairs =
+        parse(REPAIRS,
+              "repairs",
+              "CREATE TABLE %s ("
+              + "parent_id timeuuid, "
+              + "started_at timestamp, "
+              + "last_update timestamp, "
+              + "repaired_at timestamp, "
+              + "state int, "
+              + "coordinator inet, "
+              + "participants set<inet>, "
+              + "ranges set<blob>, "
+              + "cfids set<uuid>, "
+              + "PRIMARY KEY (parent_id))").build();
+
     private static TableMetadata.Builder parse(String table, String description, String cql)
     {
         return CreateTableStatement.parse(format(cql, table), SchemaConstants.SYSTEM_KEYSPACE_NAME)
@@ -322,7 +338,8 @@ public final class SystemKeyspace
                          TransferredRanges,
                          ViewsBuildsInProgress,
                          BuiltViews,
-                         PreparedStatements);
+                         PreparedStatements,
+                         Repairs);
     }
 
     private static Functions functions()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index fccad19..74c6419 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
 
 import java.util.*;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.base.Predicate;
@@ -331,6 +332,12 @@ public abstract class AbstractCompactionStrategy
 
     public abstract void removeSSTable(SSTableReader sstable);
 
+    /**
+     * Returns the sstables managed by this strategy instance
+     */
+    @VisibleForTesting
+    protected abstract Set<SSTableReader> getSSTables();
+
     public static class ScannerList implements AutoCloseable
     {
         public final List<ISSTableScanner> scanners;
@@ -579,12 +586,13 @@ public abstract class AbstractCompactionStrategy
     public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
                                                        long keyCount,
                                                        long repairedAt,
+                                                       UUID pendingRepair,
                                                        MetadataCollector meta,
                                                        SerializationHeader header,
                                                        Collection<Index> indexes,
                                                        LifecycleTransaction txn)
     {
-        return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, indexes, txn);
+        return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, pendingRepair, cfs.metadata, meta, header, indexes, txn);
     }
 
     public boolean supportsEarlyOpen()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 0feb236..5a1313b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.*;
 import java.util.concurrent.*;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -63,6 +62,7 @@ import org.apache.cassandra.io.sstable.SnapshotDeletingTask;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.CompactionMetrics;
 import org.apache.cassandra.repair.Validator;
@@ -70,6 +70,7 @@ import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.concurrent.Refs;
 
 import static java.util.Collections.singleton;
@@ -565,10 +566,11 @@ public class CompactionManager implements CompactionManagerMBean
     }
 
     public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs,
-                                          final Collection<Range<Token>> ranges,
-                                          final Refs<SSTableReader> sstables,
-                                          final long repairedAt,
-                                          final UUID parentRepairSession)
+                                                    final Collection<Range<Token>> ranges,
+                                                    final Refs<SSTableReader> sstables,
+                                                    final long repairedAt,
+                                                    final UUID pendingRepair,
+                                                    final UUID parentRepairSession)
     {
         Runnable runnable = new WrappedRunnable()
         {
@@ -588,7 +590,7 @@ public class CompactionManager implements CompactionManagerMBean
                     sstables.release(compactedSSTables);
                     modifier = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
                 }
-                performAnticompaction(cfs, ranges, sstables, modifier, repairedAt, parentRepairSession);
+                performAnticompaction(cfs, ranges, sstables, modifier, repairedAt, pendingRepair, parentRepairSession);
             }
         };
 
@@ -606,6 +608,32 @@ public class CompactionManager implements CompactionManagerMBean
     }
 
     /**
+     * Splits the given token ranges of the given sstables into a pending repair silo
+     */
+    public ListenableFuture<?> submitPendingAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, Refs<SSTableReader> sstables, LifecycleTransaction txn, UUID sessionId)
+    {
+        Runnable runnable = new WrappedRunnable()
+        {
+            protected void runMayThrow() throws Exception
+            {
+                performAnticompaction(cfs, ranges, sstables, txn, ActiveRepairService.UNREPAIRED_SSTABLE, sessionId, sessionId);
+            }
+        };
+
+        ListenableFutureTask<?> task = ListenableFutureTask.create(runnable, null);
+        try
+        {
+            executor.submitIfRunning(task, "pending anticompaction");
+            return task;
+        }
+        finally
+        {
+            if (task.isCancelled())
+                sstables.release();
+        }
+    }
+
+    /**
      * Make sure the {validatedForRepair} are marked for compaction before calling this.
      *
      * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getActiveRepairedSSTableRefs(..)).
@@ -622,6 +650,7 @@ public class CompactionManager implements CompactionManagerMBean
                                       Refs<SSTableReader> validatedForRepair,
                                       LifecycleTransaction txn,
                                       long repairedAt,
+                                      UUID pendingRepair,
                                       UUID parentRepairSession) throws InterruptedException, IOException
     {
         logger.info("[repair #{}] Starting anticompaction for {}.{} on {}/{} sstables", parentRepairSession, cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables());
@@ -654,7 +683,7 @@ public class CompactionManager implements CompactionManagerMBean
                     if (r.contains(sstableRange))
                     {
                         logger.info("[repair #{}] SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", parentRepairSession, sstable, r);
-                        sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt);
+                        sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, repairedAt, pendingRepair);
                         sstable.reloadSSTableMetadata();
                         mutatedRepairStatuses.add(sstable);
                         if (!wasRepairedBefore.get(sstable))
@@ -682,7 +711,7 @@ public class CompactionManager implements CompactionManagerMBean
             validatedForRepair.release(Sets.union(nonAnticompacting, mutatedRepairStatuses));
             assert txn.originals().equals(sstables);
             if (!sstables.isEmpty())
-                doAntiCompaction(cfs, ranges, txn, repairedAt);
+                doAntiCompaction(cfs, ranges, txn, repairedAt, pendingRepair);
             txn.finish();
         }
         finally
@@ -1092,10 +1121,10 @@ public class CompactionManager implements CompactionManagerMBean
              CompactionController controller = new CompactionController(cfs, txn.originals(), getDefaultGcBefore(cfs, nowInSec));
              CompactionIterator ci = new CompactionIterator(OperationType.CLEANUP, Collections.singletonList(scanner), controller, nowInSec, UUIDGen.getTimeUUID(), metrics))
         {
-            writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable, txn));
+            StatsMetadata metadata = sstable.getSSTableMetadata();
+            writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, metadata.repairedAt, metadata.pendingRepair, sstable, txn));
             long lastBytesScanned = 0;
 
-
             while (ci.hasNext())
             {
                 if (ci.isStopRequested())
@@ -1236,6 +1265,7 @@ public class CompactionManager implements CompactionManagerMBean
                                              File compactionFileLocation,
                                              long expectedBloomFilterSize,
                                              long repairedAt,
+                                             UUID pendingRepair,
                                              SSTableReader sstable,
                                              LifecycleTransaction txn)
     {
@@ -1245,6 +1275,7 @@ public class CompactionManager implements CompactionManagerMBean
                                     cfs.newSSTableDescriptor(compactionFileLocation),
                                     expectedBloomFilterSize,
                                     repairedAt,
+                                    pendingRepair,
                                     sstable.getSSTableLevel(),
                                     sstable.header,
                                     cfs.indexManager.listIndexes(),
@@ -1255,6 +1286,7 @@ public class CompactionManager implements CompactionManagerMBean
                                                               File compactionFileLocation,
                                                               int expectedBloomFilterSize,
                                                               long repairedAt,
+                                                              UUID pendingRepair,
                                                               Collection<SSTableReader> sstables,
                                                               LifecycleTransaction txn)
     {
@@ -1277,6 +1309,7 @@ public class CompactionManager implements CompactionManagerMBean
         return SSTableWriter.create(cfs.newSSTableDescriptor(compactionFileLocation),
                                     (long) expectedBloomFilterSize,
                                     repairedAt,
+                                    pendingRepair,
                                     cfs.metadata,
                                     new MetadataCollector(sstables, cfs.metadata().comparator, minLevel),
                                     SerializationHeader.make(cfs.metadata(), sstables),
@@ -1320,7 +1353,7 @@ public class CompactionManager implements CompactionManagerMBean
                 // If there is a snapshot created for the session then read from there.
                 // note that we populate the parent repair session when creating the snapshot, meaning the sstables in the snapshot are the ones we
                 // are supposed to validate.
-                sstables = cfs.getSnapshotSSTableReader(snapshotName);
+                sstables = cfs.getSnapshotSSTableReaders(snapshotName);
 
 
                 // Computing gcbefore based on the current time wouldn't be very good because we know each replica will execute
@@ -1331,8 +1364,11 @@ public class CompactionManager implements CompactionManagerMBean
             }
             else
             {
-                // flush first so everyone is validating data that is as similar as possible
-                StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
+                if (!validator.isConsistent)
+                {
+                    // flush first so everyone is validating data that is as similar as possible
+                    StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
+                }
                 sstables = getSSTablesToValidate(cfs, validator);
                 if (sstables == null)
                     return; // this means the parent repair session was removed - the repair session failed on another node and we removed it
@@ -1422,7 +1458,8 @@ public class CompactionManager implements CompactionManagerMBean
         return tree;
     }
 
-    private synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator)
+    @VisibleForTesting
+    synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator)
     {
         Refs<SSTableReader> sstables;
 
@@ -1430,11 +1467,20 @@ public class CompactionManager implements CompactionManagerMBean
         if (prs == null)
             return null;
         Set<SSTableReader> sstablesToValidate = new HashSet<>();
-        if (prs.isGlobal)
-            prs.markSSTablesRepairing(cfs.metadata.id, validator.desc.parentSessionId);
-        // note that we always grab all existing sstables for this - if we were to just grab the ones that
-        // were marked as repairing, we would miss any ranges that were compacted away and this would cause us to overstream
-        try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(View.select(SSTableSet.CANONICAL, (s) -> !prs.isIncremental || !s.isRepaired())))
+
+        com.google.common.base.Predicate<SSTableReader> predicate;
+        if (validator.isConsistent)
+        {
+            predicate = s -> validator.desc.parentSessionId.equals(s.getSSTableMetadata().pendingRepair);
+        }
+        else
+        {
+            // note that we always grab all existing sstables for this - if we were to just grab the ones that
+            // were marked as repairing, we would miss any ranges that were compacted away and this would cause us to overstream
+            predicate = (s) -> !prs.isIncremental || !s.isRepaired();
+        }
+
+        try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(View.select(SSTableSet.CANONICAL, predicate)))
         {
             for (SSTableReader sstable : sstableCandidates.sstables)
             {
@@ -1464,7 +1510,7 @@ public class CompactionManager implements CompactionManagerMBean
      * @param ranges Repaired ranges to be placed into one of the new sstables. The repaired table will be tracked via
      * the {@link org.apache.cassandra.io.sstable.metadata.StatsMetadata#repairedAt} field.
      */
-    private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, LifecycleTransaction repaired, long repairedAt)
+    private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, LifecycleTransaction repaired, long repairedAt, UUID pendingRepair)
     {
         logger.info("Performing anticompaction on {} sstables", repaired.originals().size());
 
@@ -1476,7 +1522,7 @@ public class CompactionManager implements CompactionManagerMBean
         {
             try (LifecycleTransaction txn = repaired.split(sstableGroup))
             {
-                int antiCompacted = antiCompactGroup(cfs, ranges, txn, repairedAt);
+                int antiCompacted = antiCompactGroup(cfs, ranges, txn, repairedAt, pendingRepair);
                 antiCompactedSSTableCount += antiCompacted;
             }
         }
@@ -1486,7 +1532,7 @@ public class CompactionManager implements CompactionManagerMBean
     }
 
     private int antiCompactGroup(ColumnFamilyStore cfs, Collection<Range<Token>> ranges,
-                             LifecycleTransaction anticompactionGroup, long repairedAt)
+                             LifecycleTransaction anticompactionGroup, long repairedAt, UUID pendingRepair)
     {
         long groupMaxDataAge = -1;
 
@@ -1520,8 +1566,8 @@ public class CompactionManager implements CompactionManagerMBean
         {
             int expectedBloomFilterSize = Math.max(cfs.metadata().params.minIndexInterval, (int)(SSTableReader.getApproximateKeyCount(sstableAsSet)));
 
-            repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet, anticompactionGroup));
-            unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet, anticompactionGroup));
+            repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, pendingRepair, sstableAsSet, anticompactionGroup));
+            unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, null, sstableAsSet, anticompactionGroup));
             Range.OrderedRangeContainmentChecker containmentChecker = new Range.OrderedRangeContainmentChecker(ranges);
             while (ci.hasNext())
             {
@@ -1669,7 +1715,7 @@ public class CompactionManager implements CompactionManagerMBean
         }
 
         @Override
-        public Predicate<Long> getPurgeEvaluator(DecoratedKey key)
+        public java.util.function.Predicate<Long> getPurgeEvaluator(DecoratedKey key)
         {
             /*
              * The main reason we always purge is that including gcable tombstone would mean that the

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 71b160a..0ccdb49 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -21,14 +21,16 @@ package org.apache.cassandra.db.compaction;
 import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import java.util.function.Supplier;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.index.Index;
-import com.google.common.primitives.Ints;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,12 +53,13 @@ import org.apache.cassandra.notifications.*;
 import org.apache.cassandra.schema.CompactionParams;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * Manages the compaction strategies.
  *
- * Currently has two instances of actual compaction strategies per data directory - one for repaired data and one for
- * unrepaired data. This is done to be able to totally separate the different sets of sstables.
+ * For each directory, a separate compaction strategy instance for both repaired and unrepaired data, and also one instance
+ * for each pending repair. This is done to keep the different sets of sstables completely separate.
  */
 
 public class CompactionStrategyManager implements INotificationConsumer
@@ -66,6 +69,7 @@ public class CompactionStrategyManager implements INotificationConsumer
     private final ColumnFamilyStore cfs;
     private final List<AbstractCompactionStrategy> repaired = new ArrayList<>();
     private final List<AbstractCompactionStrategy> unrepaired = new ArrayList<>();
+    private final List<PendingRepairManager> pendingRepairs = new ArrayList<>();
     private volatile boolean enabled = true;
     private volatile boolean isActive = true;
     private volatile CompactionParams params;
@@ -83,6 +87,7 @@ public class CompactionStrategyManager implements INotificationConsumer
     private volatile CompactionParams schemaCompactionParams;
     private Directories.DataDirectory[] locations;
 
+
     public CompactionStrategyManager(ColumnFamilyStore cfs)
     {
         cfs.getTracker().subscribe(this);
@@ -110,23 +115,61 @@ public class CompactionStrategyManager implements INotificationConsumer
                 return null;
 
             maybeReload(cfs.metadata());
-            List<AbstractCompactionStrategy> strategies = new ArrayList<>();
 
-            strategies.addAll(repaired);
-            strategies.addAll(unrepaired);
-            Collections.sort(strategies, (o1, o2) -> Ints.compare(o2.getEstimatedRemainingTasks(), o1.getEstimatedRemainingTasks()));
-            for (AbstractCompactionStrategy strategy : strategies)
+            // first try to promote/demote sstables from completed repairs
+            ArrayList<Pair<Integer, PendingRepairManager>> pendingRepairManagers = new ArrayList<>(pendingRepairs.size());
+            for (PendingRepairManager pendingRepair : pendingRepairs)
+            {
+                int numPending = pendingRepair.getNumPendingRepairFinishedTasks();
+                if (numPending > 0)
+                {
+                    pendingRepairManagers.add(Pair.create(numPending, pendingRepair));
+                }
+            }
+            if (!pendingRepairManagers.isEmpty())
+            {
+                pendingRepairManagers.sort((x, y) -> y.left - x.left);
+                for (Pair<Integer, PendingRepairManager> pair : pendingRepairManagers)
+                {
+                    AbstractCompactionTask task = pair.right.getNextRepairFinishedTask();
+                    if (task != null)
+                    {
+                        return task;
+                    }
+                }
+            }
+
+            // sort compaction task suppliers by remaining tasks descending
+            ArrayList<Pair<Integer, Supplier<AbstractCompactionTask>>> sortedSuppliers = new ArrayList<>(repaired.size() + unrepaired.size() + 1);
+
+            for (AbstractCompactionStrategy strategy : repaired)
+                sortedSuppliers.add(Pair.create(strategy.getEstimatedRemainingTasks(), () -> strategy.getNextBackgroundTask(gcBefore)));
+
+            for (AbstractCompactionStrategy strategy : unrepaired)
+                sortedSuppliers.add(Pair.create(strategy.getEstimatedRemainingTasks(), () -> strategy.getNextBackgroundTask(gcBefore)));
+
+            for (PendingRepairManager pending : pendingRepairs)
+                sortedSuppliers.add(Pair.create(pending.getMaxEstimatedRemainingTasks(), () -> pending.getNextBackgroundTask(gcBefore)));
+
+            sortedSuppliers.sort((x, y) -> y.left - x.left);
+
+            // return the first non-null task
+            AbstractCompactionTask task;
+            Iterator<Supplier<AbstractCompactionTask>> suppliers = Iterables.transform(sortedSuppliers, p -> p.right).iterator();
+            assert suppliers.hasNext();
+
+            do
             {
-                AbstractCompactionTask task = strategy.getNextBackgroundTask(gcBefore);
-                if (task != null)
-                    return task;
+                task = suppliers.next().get();
             }
+            while (suppliers.hasNext() && task == null);
+
+            return task;
         }
         finally
         {
             readLock.unlock();
         }
-        return null;
     }
 
     public boolean isEnabled()
@@ -183,6 +226,7 @@ public class CompactionStrategyManager implements INotificationConsumer
             }
             repaired.forEach(AbstractCompactionStrategy::startup);
             unrepaired.forEach(AbstractCompactionStrategy::startup);
+            pendingRepairs.forEach(PendingRepairManager::startup);
         }
         finally
         {
@@ -190,6 +234,7 @@ public class CompactionStrategyManager implements INotificationConsumer
         }
         repaired.forEach(AbstractCompactionStrategy::startup);
         unrepaired.forEach(AbstractCompactionStrategy::startup);
+        pendingRepairs.forEach(PendingRepairManager::startup);
         if (Stream.concat(repaired.stream(), unrepaired.stream()).anyMatch(cs -> cs.logAll))
             compactionLogger.enable();
     }
@@ -207,7 +252,9 @@ public class CompactionStrategyManager implements INotificationConsumer
         readLock.lock();
         try
         {
-            if (sstable.isRepaired())
+            if (sstable.isPendingRepair())
+                return pendingRepairs.get(index).getOrCreate(sstable);
+            else if (sstable.isRepaired())
                 return repaired.get(index);
             else
                 return unrepaired.get(index);
@@ -239,22 +286,60 @@ public class CompactionStrategyManager implements INotificationConsumer
         Directories.DataDirectory[] directories = locations.getWriteableLocations();
         List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(cfs, directories);
         if (boundaries == null)
-        {
-            // try to figure out location based on sstable directory:
-            for (int i = 0; i < directories.length; i++)
-            {
-                Directories.DataDirectory directory = directories[i];
-                if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
-                    return i;
-            }
-            return 0;
-        }
+            return getCompactionStrategyIndex(locations, sstable.descriptor);
 
         int pos = Collections.binarySearch(boundaries, sstable.first);
         assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
         return -pos - 1;
     }
 
+    /**
+     * get the index for the descriptor based on the existing directories
+     * @param locations
+     * @param descriptor
+     * @return
+     */
+    private static int getCompactionStrategyIndex(Directories locations, Descriptor descriptor)
+    {
+         Directories.DataDirectory[] directories = locations.getWriteableLocations();
+         // try to figure out location based on sstable directory:
+         for (int i = 0; i < directories.length; i++)
+         {
+             Directories.DataDirectory directory = directories[i];
+             if (descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
+                 return i;
+         }
+         return 0;
+    }
+
+    @VisibleForTesting
+    List<AbstractCompactionStrategy> getRepaired()
+    {
+        return repaired;
+    }
+
+    @VisibleForTesting
+    List<AbstractCompactionStrategy> getUnrepaired()
+    {
+        return unrepaired;
+    }
+
+    @VisibleForTesting
+    List<AbstractCompactionStrategy> getForPendingRepair(UUID sessionID)
+    {
+        List<AbstractCompactionStrategy> strategies = new ArrayList<>(pendingRepairs.size());
+        pendingRepairs.forEach(p -> strategies.add(p.get(sessionID)));
+        return strategies;
+    }
+
+    @VisibleForTesting
+    Set<UUID> pendingRepairs()
+    {
+        Set<UUID> ids = new HashSet<>();
+        pendingRepairs.forEach(p -> ids.addAll(p.getSessions()));
+        return ids;
+    }
+
     public void shutdown()
     {
         writeLock.lock();
@@ -263,6 +348,7 @@ public class CompactionStrategyManager implements INotificationConsumer
             isActive = false;
             repaired.forEach(AbstractCompactionStrategy::shutdown);
             unrepaired.forEach(AbstractCompactionStrategy::shutdown);
+            pendingRepairs.forEach(PendingRepairManager::shutdown);
             compactionLogger.disable();
         }
         finally
@@ -332,6 +418,9 @@ public class CompactionStrategyManager implements INotificationConsumer
                     count += ((LeveledCompactionStrategy) strategy).getLevelSize(0);
                 for (AbstractCompactionStrategy strategy : unrepaired)
                     count += ((LeveledCompactionStrategy) strategy).getLevelSize(0);
+                for (PendingRepairManager pendingManager : pendingRepairs)
+                    for (AbstractCompactionStrategy strategy : pendingManager.getStrategies())
+                        count += ((LeveledCompactionStrategy) strategy).getLevelSize(0);
                 return count;
             }
         }
@@ -377,6 +466,11 @@ public class CompactionStrategyManager implements INotificationConsumer
                     int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize();
                     res = sumArrays(res, unrepairedCountPerLevel);
                 }
+                for (PendingRepairManager pending : pendingRepairs)
+                {
+                    int[] pendingRepairCountPerLevel = pending.getSSTableCountPerLevel();
+                    res = sumArrays(res, pendingRepairCountPerLevel);
+                }
                 return res;
             }
         }
@@ -387,7 +481,7 @@ public class CompactionStrategyManager implements INotificationConsumer
         return null;
     }
 
-    private static int[] sumArrays(int[] a, int[] b)
+    static int[] sumArrays(int[] a, int[] b)
     {
         int[] res = new int[Math.max(a.length, b.length)];
         for (int i = 0; i < res.length; i++)
@@ -451,6 +545,8 @@ public class CompactionStrategyManager implements INotificationConsumer
         Directories.DataDirectory [] locations = cfs.getDirectories().getWriteableLocations();
         int locationSize = cfs.getPartitioner().splitter().isPresent() ? locations.length : 1;
 
+        List<Set<SSTableReader>> pendingRemoved = new ArrayList<>(locationSize);
+        List<Set<SSTableReader>> pendingAdded = new ArrayList<>(locationSize);
         List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
         List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
         List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
@@ -458,6 +554,8 @@ public class CompactionStrategyManager implements INotificationConsumer
 
         for (int i = 0; i < locationSize; i++)
         {
+            pendingRemoved.add(new HashSet<>());
+            pendingAdded.add(new HashSet<>());
             repairedRemoved.add(new HashSet<>());
             repairedAdded.add(new HashSet<>());
             unrepairedRemoved.add(new HashSet<>());
@@ -467,7 +565,9 @@ public class CompactionStrategyManager implements INotificationConsumer
         for (SSTableReader sstable : removed)
         {
             int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
-            if (sstable.isRepaired())
+            if (sstable.isPendingRepair())
+                pendingRemoved.get(i).add(sstable);
+            else if (sstable.isRepaired())
                 repairedRemoved.get(i).add(sstable);
             else
                 unrepairedRemoved.get(i).add(sstable);
@@ -475,7 +575,9 @@ public class CompactionStrategyManager implements INotificationConsumer
         for (SSTableReader sstable : added)
         {
             int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
-            if (sstable.isRepaired())
+            if (sstable.isPendingRepair())
+                pendingAdded.get(i).add(sstable);
+            else if (sstable.isRepaired())
                 repairedAdded.get(i).add(sstable);
             else
                 unrepairedAdded.get(i).add(sstable);
@@ -486,6 +588,17 @@ public class CompactionStrategyManager implements INotificationConsumer
         {
             for (int i = 0; i < locationSize; i++)
             {
+
+                if (!pendingRemoved.get(i).isEmpty())
+                {
+                    pendingRepairs.get(i).replaceSSTables(pendingRemoved.get(i), pendingAdded.get(i));
+                }
+                else
+                {
+                    PendingRepairManager pendingManager = pendingRepairs.get(i);
+                    pendingAdded.get(i).forEach(s -> pendingManager.addSSTable(s));
+                }
+
                 if (!repairedRemoved.get(i).isEmpty())
                     repaired.get(i).replaceSSTables(repairedRemoved.get(i), repairedAdded.get(i));
                 else
@@ -512,13 +625,21 @@ public class CompactionStrategyManager implements INotificationConsumer
             for (SSTableReader sstable : sstables)
             {
                 int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
-                if (sstable.isRepaired())
+                if (sstable.isPendingRepair())
+                {
+                    pendingRepairs.get(index).addSSTable(sstable);
+                    unrepaired.get(index).removeSSTable(sstable);
+                    repaired.get(index).removeSSTable(sstable);
+                }
+                else if (sstable.isRepaired())
                 {
+                    pendingRepairs.get(index).removeSSTable(sstable);
                     unrepaired.get(index).removeSSTable(sstable);
                     repaired.get(index).addSSTable(sstable);
                 }
                 else
                 {
+                    pendingRepairs.get(index).removeSSTable(sstable);
                     repaired.get(index).removeSSTable(sstable);
                     unrepaired.get(index).addSSTable(sstable);
                 }
@@ -574,6 +695,8 @@ public class CompactionStrategyManager implements INotificationConsumer
                 repaired.forEach(AbstractCompactionStrategy::enable);
             if (unrepaired != null)
                 unrepaired.forEach(AbstractCompactionStrategy::enable);
+            if (pendingRepairs != null)
+                pendingRepairs.forEach(PendingRepairManager::enable);
             // enable this last to make sure the strategies are ready to get calls.
             enabled = true;
         }
@@ -594,6 +717,8 @@ public class CompactionStrategyManager implements INotificationConsumer
                 repaired.forEach(AbstractCompactionStrategy::disable);
             if (unrepaired != null)
                 unrepaired.forEach(AbstractCompactionStrategy::disable);
+            if (pendingRepairs != null)
+                pendingRepairs.forEach(PendingRepairManager::disable);
         }
         finally
         {
@@ -613,21 +738,27 @@ public class CompactionStrategyManager implements INotificationConsumer
     public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables,  Collection<Range<Token>> ranges)
     {
         assert repaired.size() == unrepaired.size();
+        assert repaired.size() == pendingRepairs.size();
+        List<Set<SSTableReader>> pendingSSTables = new ArrayList<>();
         List<Set<SSTableReader>> repairedSSTables = new ArrayList<>();
         List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>();
 
         for (int i = 0; i < repaired.size(); i++)
         {
+            pendingSSTables.add(new HashSet<>());
             repairedSSTables.add(new HashSet<>());
             unrepairedSSTables.add(new HashSet<>());
         }
 
         for (SSTableReader sstable : sstables)
         {
-            if (sstable.isRepaired())
-                repairedSSTables.get(getCompactionStrategyIndex(cfs, getDirectories(), sstable)).add(sstable);
+            int idx = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+            if (sstable.isPendingRepair())
+                pendingSSTables.get(idx).add(sstable);
+            else if (sstable.isRepaired())
+                repairedSSTables.get(idx).add(sstable);
             else
-                unrepairedSSTables.get(getCompactionStrategyIndex(cfs, getDirectories(), sstable)).add(sstable);
+                unrepairedSSTables.get(idx).add(sstable);
         }
 
         List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
@@ -635,6 +766,11 @@ public class CompactionStrategyManager implements INotificationConsumer
         readLock.lock();
         try
         {
+            for (int i = 0; i < pendingSSTables.size(); i++)
+            {
+                if (!pendingSSTables.get(i).isEmpty())
+                    scanners.addAll(pendingRepairs.get(i).getScanners(pendingSSTables.get(i), ranges));
+            }
             for (int i = 0; i < repairedSSTables.size(); i++)
             {
                 if (!repairedSSTables.get(i).isEmpty())
@@ -703,12 +839,15 @@ public class CompactionStrategyManager implements INotificationConsumer
         assert firstSSTable != null;
         boolean repaired = firstSSTable.isRepaired();
         int firstIndex = getCompactionStrategyIndex(cfs, directories, firstSSTable);
+        boolean isPending = firstSSTable.isPendingRepair();
         for (SSTableReader sstable : input)
         {
             if (sstable.isRepaired() != repaired)
                 throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
             if (firstIndex != getCompactionStrategyIndex(cfs, directories, sstable))
                 throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
+            if (isPending != sstable.isPendingRepair())
+                throw new UnsupportedOperationException("You can't compact sstables pending for repair with non-pending ones");
         }
     }
 
@@ -739,6 +878,13 @@ public class CompactionStrategyManager implements INotificationConsumer
                         if (task != null)
                             tasks.addAll(task);
                     }
+
+                    for (PendingRepairManager pending : pendingRepairs)
+                    {
+                        Collection<AbstractCompactionTask> pendingRepairTasks = pending.getMaximalTasks(gcBefore, splitOutput);
+                        if (pendingRepairTasks != null)
+                            tasks.addAll(pendingRepairTasks);
+                    }
                 }
                 finally
                 {
@@ -768,13 +914,16 @@ public class CompactionStrategyManager implements INotificationConsumer
         try
         {
             Map<Integer, List<SSTableReader>> repairedSSTables = sstables.stream()
-                                                                         .filter(s -> !s.isMarkedSuspect() && s.isRepaired())
+                                                                         .filter(s -> !s.isMarkedSuspect() && s.isRepaired() && !s.isPendingRepair())
                                                                          .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
 
             Map<Integer, List<SSTableReader>> unrepairedSSTables = sstables.stream()
-                                                                           .filter(s -> !s.isMarkedSuspect() && !s.isRepaired())
+                                                                           .filter(s -> !s.isMarkedSuspect() && !s.isRepaired() && !s.isPendingRepair())
                                                                            .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
 
+            Map<Integer, List<SSTableReader>> pendingSSTables = sstables.stream()
+                                                                        .filter(s -> !s.isMarkedSuspect() && s.isPendingRepair())
+                                                                        .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
 
             for (Map.Entry<Integer, List<SSTableReader>> group : repairedSSTables.entrySet())
                 ret.add(repaired.get(group.getKey()).getUserDefinedTask(group.getValue(), gcBefore));
@@ -782,6 +931,9 @@ public class CompactionStrategyManager implements INotificationConsumer
             for (Map.Entry<Integer, List<SSTableReader>> group : unrepairedSSTables.entrySet())
                 ret.add(unrepaired.get(group.getKey()).getUserDefinedTask(group.getValue(), gcBefore));
 
+            for (Map.Entry<Integer, List<SSTableReader>> group : pendingSSTables.entrySet())
+                ret.addAll(pendingRepairs.get(group.getKey()).createUserDefinedTasks(group.getValue(), gcBefore));
+
             return ret;
         }
         finally
@@ -808,11 +960,12 @@ public class CompactionStrategyManager implements INotificationConsumer
         readLock.lock();
         try
         {
-
             for (AbstractCompactionStrategy strategy : repaired)
                 tasks += strategy.getEstimatedRemainingTasks();
             for (AbstractCompactionStrategy strategy : unrepaired)
                 tasks += strategy.getEstimatedRemainingTasks();
+            for (PendingRepairManager pending : pendingRepairs)
+                tasks += pending.getEstimatedRemainingTasks();
         }
         finally
         {
@@ -844,7 +997,9 @@ public class CompactionStrategyManager implements INotificationConsumer
         readLock.lock();
         try
         {
-            return Arrays.asList(repaired, unrepaired);
+            List<AbstractCompactionStrategy> pending = new ArrayList<>();
+            pendingRepairs.forEach(p -> pending.addAll(p.getStrategies()));
+            return Arrays.asList(repaired, unrepaired, pending);
         }
         finally
         {
@@ -875,8 +1030,10 @@ public class CompactionStrategyManager implements INotificationConsumer
     {
         repaired.forEach(AbstractCompactionStrategy::shutdown);
         unrepaired.forEach(AbstractCompactionStrategy::shutdown);
+        pendingRepairs.forEach(PendingRepairManager::shutdown);
         repaired.clear();
         unrepaired.clear();
+        pendingRepairs.clear();
 
         if (cfs.getPartitioner().splitter().isPresent())
         {
@@ -885,12 +1042,14 @@ public class CompactionStrategyManager implements INotificationConsumer
             {
                 repaired.add(cfs.createCompactionStrategyInstance(params));
                 unrepaired.add(cfs.createCompactionStrategyInstance(params));
+                pendingRepairs.add(new PendingRepairManager(cfs, params));
             }
         }
         else
         {
             repaired.add(cfs.createCompactionStrategyInstance(params));
             unrepaired.add(cfs.createCompactionStrategyInstance(params));
+            pendingRepairs.add(new PendingRepairManager(cfs, params));
         }
         this.params = params;
     }
@@ -908,6 +1067,7 @@ public class CompactionStrategyManager implements INotificationConsumer
     public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
                                                        long keyCount,
                                                        long repairedAt,
+                                                       UUID pendingRepair,
                                                        MetadataCollector collector,
                                                        SerializationHeader header,
                                                        Collection<Index> indexes,
@@ -916,14 +1076,16 @@ public class CompactionStrategyManager implements INotificationConsumer
         readLock.lock();
         try
         {
-            if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
-            {
-                return unrepaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
-            }
+            // to avoid creating a compaction strategy for the wrong pending repair manager, we get the index based on where the sstable is to be written
+            int index = cfs.getPartitioner().splitter().isPresent()
+                        ? getCompactionStrategyIndex(getDirectories(), descriptor)
+                        : 0;
+            if (pendingRepair != ActiveRepairService.NO_PENDING_REPAIR)
+                return pendingRepairs.get(index).getOrCreate(pendingRepair).createSSTableMultiWriter(descriptor, keyCount, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair, collector, header, indexes, txn);
+            else if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+                return unrepaired.get(index).createSSTableMultiWriter(descriptor, keyCount, repairedAt, ActiveRepairService.NO_PENDING_REPAIR, collector, header, indexes, txn);
             else
-            {
-                return repaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
-            }
+                return repaired.get(index).createSSTableMultiWriter(descriptor, keyCount, repairedAt, ActiveRepairService.NO_PENDING_REPAIR, collector, header, indexes, txn);
         }
         finally
         {
@@ -951,6 +1113,14 @@ public class CompactionStrategyManager implements INotificationConsumer
             {
                 return Collections.singletonList(locations[repairedIndex].location.getAbsolutePath());
             }
+            for (int i = 0; i < pendingRepairs.size(); i++)
+            {
+                PendingRepairManager pending = pendingRepairs.get(i);
+                if (pending.hasStrategy(strategy))
+                {
+                    return Collections.singletonList(locations[i].location.getAbsolutePath());
+                }
+            }
         }
         List<String> folders = new ArrayList<>(locations.length);
         for (Directories.DataDirectory location : locations)
@@ -964,4 +1134,10 @@ public class CompactionStrategyManager implements INotificationConsumer
     {
         return repaired.get(0).supportsEarlyOpen();
     }
+
+    @VisibleForTesting
+    List<PendingRepairManager> getPendingRepairManagers()
+    {
+        return pendingRepairs;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 62efa3d..789de1e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
 
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -318,6 +319,22 @@ public class CompactionTask extends AbstractCompactionTask
         return minRepairedAt;
     }
 
+    public static UUID getPendingRepair(Set<SSTableReader> sstables)
+    {
+        if (sstables.isEmpty())
+        {
+            return ActiveRepairService.NO_PENDING_REPAIR;
+        }
+        Set<UUID> ids = new HashSet<>();
+        for (SSTableReader sstable: sstables)
+            ids.add(sstable.getSSTableMetadata().pendingRepair);
+
+        if (ids.size() != 1)
+            throw new RuntimeException(String.format("Attempting to compact pending repair sstables with sstables from other repair, or sstables not pending repair: %s", ids));
+
+        return ids.iterator().next();
+    }
+
     /*
     Checks if we have enough disk space to execute the compaction.  Drops the largest sstable out of the Task until
     there's enough space (in theory) to handle the compaction.  Does not take into account space that will be taken by

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 729ddc0..ae35dcd 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -221,6 +221,12 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
         sstables.remove(sstable);
     }
 
+    @Override
+    protected Set<SSTableReader> getSSTables()
+    {
+        return ImmutableSet.copyOf(sstables);
+    }
+
     /**
      * A target time span used for bucketing SSTables based on timestamps.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 9495582..4f11a03 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -339,6 +339,12 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
         manifest.remove(sstable);
     }
 
+    @Override
+    protected Set<SSTableReader> getSSTables()
+    {
+        return manifest.getSSTables();
+    }
+
     // Lazily creates SSTableBoundedScanner for sstable that are assumed to be from the
     // same level (e.g. non overlapping) - see #4142
     private static class LeveledScanner extends AbstractIterator<UnfilteredRowIterator> implements ISSTableScanner

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 3d118de..0c53812 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -23,6 +23,7 @@ import java.util.*;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -526,6 +527,16 @@ public class LeveledManifest
         return level;
     }
 
+    public synchronized Set<SSTableReader> getSSTables()
+    {
+        ImmutableSet.Builder<SSTableReader> builder = ImmutableSet.builder();
+        for (List<SSTableReader> sstables : generations)
+        {
+            builder.addAll(sstables);
+        }
+        return builder.build();
+    }
+
     private static Set<SSTableReader> overlapping(Collection<SSTableReader> candidates, Iterable<SSTableReader> others)
     {
         assert !candidates.isEmpty();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
new file mode 100644
index 0000000..a270ead
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * Companion to CompactionStrategyManager which manages the sstables marked pending repair.
+ *
+ * SSTables are classified as pending repair by the anti-compaction performed at the beginning
+ * of an incremental repair, or when they're streamed in with a pending repair id. This prevents
+ * unrepaired / pending repaired sstables from being compacted together. Once the repair session
+ * has completed, or failed, sstables will be re-classified as part of the compaction process.
+ */
+class PendingRepairManager
+{
+    private static final Logger logger = LoggerFactory.getLogger(PendingRepairManager.class);
+
+    private final ColumnFamilyStore cfs;
+    private final CompactionParams params;
+    private volatile ImmutableMap<UUID, AbstractCompactionStrategy> strategies = ImmutableMap.of();
+
+    PendingRepairManager(ColumnFamilyStore cfs, CompactionParams params)
+    {
+        this.cfs = cfs;
+        this.params = params;
+    }
+
+    private ImmutableMap.Builder<UUID, AbstractCompactionStrategy> mapBuilder()
+    {
+        return ImmutableMap.builder();
+    }
+
+    AbstractCompactionStrategy get(UUID id)
+    {
+        return strategies.get(id);
+    }
+
+    AbstractCompactionStrategy get(SSTableReader sstable)
+    {
+        assert sstable.isPendingRepair();
+        return get(sstable.getSSTableMetadata().pendingRepair);
+    }
+
+    AbstractCompactionStrategy getOrCreate(UUID id)
+    {
+        assert id != null;
+        AbstractCompactionStrategy strategy = get(id);
+        if (strategy == null)
+        {
+            synchronized (this)
+            {
+                strategy = get(id);
+
+                if (strategy == null)
+                {
+                    logger.debug("Creating {}.{} compaction strategy for pending repair: {}", cfs.metadata.keyspace, cfs.metadata.name, id);
+                    strategy = cfs.createCompactionStrategyInstance(params);
+                    strategies = mapBuilder().putAll(strategies).put(id, strategy).build();
+                }
+            }
+        }
+        return strategy;
+    }
+
+    AbstractCompactionStrategy getOrCreate(SSTableReader sstable)
+    {
+        assert sstable.isPendingRepair();
+        return getOrCreate(sstable.getSSTableMetadata().pendingRepair);
+    }
+
+    private synchronized void removeSession(UUID sessionID)
+    {
+        if (!strategies.containsKey(sessionID))
+            return;
+
+        logger.debug("Removing compaction strategy for pending repair {} on  {}.{}", sessionID, cfs.metadata.keyspace, cfs.metadata.name);
+        strategies = ImmutableMap.copyOf(Maps.filterKeys(strategies, k -> !k.equals(sessionID)));
+    }
+
+    synchronized void removeSSTable(SSTableReader sstable)
+    {
+        for (AbstractCompactionStrategy strategy : strategies.values())
+            strategy.removeSSTable(sstable);
+    }
+
+    synchronized void addSSTable(SSTableReader sstable)
+    {
+        getOrCreate(sstable).addSSTable(sstable);
+    }
+
+    synchronized void replaceSSTables(Set<SSTableReader> removed, Set<SSTableReader> added)
+    {
+        if (removed.isEmpty() && added.isEmpty())
+            return;
+
+        // left=removed, right=added
+        Map<UUID, Pair<Set<SSTableReader>, Set<SSTableReader>>> groups = new HashMap<>();
+        for (SSTableReader sstable : removed)
+        {
+            UUID sessionID = sstable.getSSTableMetadata().pendingRepair;
+            if (!groups.containsKey(sessionID))
+            {
+                groups.put(sessionID, Pair.create(new HashSet<>(), new HashSet<>()));
+            }
+            groups.get(sessionID).left.add(sstable);
+        }
+
+        for (SSTableReader sstable : added)
+        {
+            UUID sessionID = sstable.getSSTableMetadata().pendingRepair;
+            if (!groups.containsKey(sessionID))
+            {
+                groups.put(sessionID, Pair.create(new HashSet<>(), new HashSet<>()));
+            }
+            groups.get(sessionID).right.add(sstable);
+        }
+
+        for (Map.Entry<UUID, Pair<Set<SSTableReader>, Set<SSTableReader>>> entry : groups.entrySet())
+        {
+            AbstractCompactionStrategy strategy = getOrCreate(entry.getKey());
+            Set<SSTableReader> groupRemoved = entry.getValue().left;
+            Set<SSTableReader> groupAdded = entry.getValue().right;
+
+            if (!groupRemoved.isEmpty())
+                strategy.replaceSSTables(groupRemoved, groupAdded);
+            else
+                strategy.addSSTables(groupAdded);
+        }
+    }
+
+    synchronized void startup()
+    {
+        strategies.values().forEach(AbstractCompactionStrategy::startup);
+    }
+
+    synchronized void shutdown()
+    {
+        strategies.values().forEach(AbstractCompactionStrategy::shutdown);
+    }
+
+    synchronized void enable()
+    {
+        strategies.values().forEach(AbstractCompactionStrategy::enable);
+    }
+
+    synchronized void disable()
+    {
+        strategies.values().forEach(AbstractCompactionStrategy::disable);
+    }
+
+    private int getEstimatedRemainingTasks(UUID sessionID, AbstractCompactionStrategy strategy)
+    {
+        if (canCleanup(sessionID))
+        {
+            return 0;
+        }
+        else
+        {
+            return strategy.getEstimatedRemainingTasks();
+        }
+    }
+
+    int getEstimatedRemainingTasks()
+    {
+        int tasks = 0;
+        for (Map.Entry<UUID, AbstractCompactionStrategy> entry : strategies.entrySet())
+        {
+            tasks += getEstimatedRemainingTasks(entry.getKey(), entry.getValue());
+        }
+        return tasks;
+    }
+
+    /**
+     * @return the highest max remaining tasks of all contained compaction strategies
+     */
+    int getMaxEstimatedRemainingTasks()
+    {
+        int tasks = 0;
+        for (Map.Entry<UUID, AbstractCompactionStrategy> entry : strategies.entrySet())
+        {
+            tasks = Math.max(tasks, getEstimatedRemainingTasks(entry.getKey(), entry.getValue()));
+        }
+        return tasks;
+    }
+
+    private RepairFinishedCompactionTask getRepairFinishedCompactionTask(UUID sessionID)
+    {
+        Set<SSTableReader> sstables = get(sessionID).getSSTables();
+        long repairedAt = ActiveRepairService.instance.consistent.local.getFinalSessionRepairedAt(sessionID);
+        LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+        return txn == null ? null : new RepairFinishedCompactionTask(cfs, txn, sessionID, repairedAt);
+    }
+
+    synchronized int getNumPendingRepairFinishedTasks()
+    {
+        int count = 0;
+        for (UUID sessionID : strategies.keySet())
+        {
+            if (canCleanup(sessionID))
+            {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    synchronized AbstractCompactionTask getNextRepairFinishedTask()
+    {
+        for (UUID sessionID : strategies.keySet())
+        {
+            if (canCleanup(sessionID))
+            {
+                return getRepairFinishedCompactionTask(sessionID);
+            }
+        }
+        return null;
+    }
+
+    synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+    {
+        if (strategies.isEmpty())
+            return null;
+
+        Map<UUID, Integer> numTasks = new HashMap<>(strategies.size());
+        ArrayList<UUID> sessions = new ArrayList<>(strategies.size());
+        for (Map.Entry<UUID, AbstractCompactionStrategy> entry : strategies.entrySet())
+        {
+            if (canCleanup(entry.getKey()))
+            {
+                continue;
+            }
+            numTasks.put(entry.getKey(), getEstimatedRemainingTasks(entry.getKey(), entry.getValue()));
+            sessions.add(entry.getKey());
+        }
+
+        // we want the session with the most compactions at the head of the list
+        sessions.sort((o1, o2) -> numTasks.get(o2) - numTasks.get(o1));
+
+        UUID sessionID = sessions.get(0);
+        return get(sessionID).getNextBackgroundTask(gcBefore);
+    }
+
+    synchronized Collection<AbstractCompactionTask> getMaximalTasks(int gcBefore, boolean splitOutput)
+    {
+        if (strategies.isEmpty())
+            return null;
+
+        List<AbstractCompactionTask> maximalTasks = new ArrayList<>(strategies.size());
+        for (Map.Entry<UUID, AbstractCompactionStrategy> entry : strategies.entrySet())
+        {
+            if (canCleanup(entry.getKey()))
+            {
+                maximalTasks.add(getRepairFinishedCompactionTask(entry.getKey()));
+            }
+            else
+            {
+                Collection<AbstractCompactionTask> tasks = entry.getValue().getMaximalTask(gcBefore, splitOutput);
+                if (tasks != null)
+                    maximalTasks.addAll(tasks);
+            }
+        }
+        return !maximalTasks.isEmpty() ? maximalTasks : null;
+    }
+
+    Collection<AbstractCompactionStrategy> getStrategies()
+    {
+        return strategies.values();
+    }
+
+    Set<UUID> getSessions()
+    {
+        return strategies.keySet();
+    }
+
+    boolean canCleanup(UUID sessionID)
+    {
+        return !ActiveRepairService.instance.consistent.local.isSessionInProgress(sessionID);
+    }
+
+    /**
+     * calling this when underlying strategy is not LeveledCompactionStrategy is an error
+     */
+    synchronized int[] getSSTableCountPerLevel()
+    {
+        int [] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
+        for (AbstractCompactionStrategy strategy : strategies.values())
+        {
+            assert strategy instanceof LeveledCompactionStrategy;
+            int[] counts = ((LeveledCompactionStrategy) strategy).getAllLevelSize();
+            res = CompactionStrategyManager.sumArrays(res, counts);
+        }
+        return res;
+    }
+
+    @SuppressWarnings("resource")
+    synchronized Set<ISSTableScanner> getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges)
+    {
+        if (sstables.isEmpty())
+        {
+            return Collections.emptySet();
+        }
+
+        Map<UUID, Set<SSTableReader>> sessionSSTables = new HashMap<>();
+        for (SSTableReader sstable : sstables)
+        {
+            UUID sessionID = sstable.getSSTableMetadata().pendingRepair;
+            assert sessionID != null;
+            sessionSSTables.computeIfAbsent(sessionID, k -> new HashSet<>()).add(sstable);
+        }
+
+        Set<ISSTableScanner> scanners = new HashSet<>(sessionSSTables.size());
+        for (Map.Entry<UUID, Set<SSTableReader>> entry : sessionSSTables.entrySet())
+        {
+            scanners.addAll(get(entry.getKey()).getScanners(entry.getValue(), ranges).scanners);
+        }
+        return scanners;
+    }
+
+    public boolean hasStrategy(AbstractCompactionStrategy strategy)
+    {
+        return strategies.values().contains(strategy);
+    }
+
+    public Collection<AbstractCompactionTask> createUserDefinedTasks(List<SSTableReader> sstables, int gcBefore)
+    {
+        Map<UUID, List<SSTableReader>> group = sstables.stream().collect(Collectors.groupingBy(s -> s.getSSTableMetadata().pendingRepair));
+        return group.entrySet().stream().map(g -> strategies.get(g.getKey()).getUserDefinedTask(g.getValue(), gcBefore)).collect(Collectors.toList());
+    }
+
+    /**
+     * promotes/demotes sstables involved in a consistent repair that has been finalized, or failed
+     */
+    class RepairFinishedCompactionTask extends AbstractCompactionTask
+    {
+        private final UUID sessionID;
+        private final long repairedAt;
+
+        RepairFinishedCompactionTask(ColumnFamilyStore cfs, LifecycleTransaction transaction, UUID sessionID, long repairedAt)
+        {
+            super(cfs, transaction);
+            this.sessionID = sessionID;
+            this.repairedAt = repairedAt;
+        }
+
+        @VisibleForTesting
+        UUID getSessionID()
+        {
+            return sessionID;
+        }
+
+        protected void runMayThrow() throws Exception
+        {
+            for (SSTableReader sstable : transaction.originals())
+            {
+                logger.debug("Setting repairedAt to {} on {} for {}", repairedAt, sstable, sessionID);
+                sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, repairedAt, ActiveRepairService.NO_PENDING_REPAIR);
+                sstable.reloadSSTableMetadata();
+            }
+            cfs.getTracker().notifySSTableRepairedStatusChanged(transaction.originals());
+            transaction.abort();
+        }
+
+        public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        protected int executeInternal(CompactionManager.CompactionExecutorStatsCollector collector)
+        {
+            run();
+            return transaction.originals().size();
+        }
+
+        public int execute(CompactionManager.CompactionExecutorStatsCollector collector)
+        {
+            try
+            {
+                return super.execute(collector);
+            }
+            finally
+            {
+                removeSession(sessionID);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index e8eee9a..cd5238f 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -152,7 +153,8 @@ public class Scrubber implements Closeable
                 assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
             }
 
-            writer.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable, transaction));
+            StatsMetadata metadata = sstable.getSSTableMetadata();
+            writer.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, metadata.repairedAt, metadata.pendingRepair, sstable, transaction));
 
             DecoratedKey prevKey = null;
 
@@ -257,9 +259,9 @@ public class Scrubber implements Closeable
             if (!outOfOrder.isEmpty())
             {
                 // out of order rows, but no bad rows found - we can keep our repairedAt time
-                long repairedAt = badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt;
+                long repairedAt = badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : metadata.repairedAt;
                 SSTableReader newInOrderSstable;
-                try (SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable, transaction))
+                try (SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, metadata.pendingRepair, sstable, transaction))
                 {
                     for (Partition partition : outOfOrder)
                         inOrderWriter.append(partition.unfilteredIterator());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 8302a9b..b8c72bb 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -21,6 +21,7 @@ import java.util.*;
 import java.util.Map.Entry;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -327,6 +328,12 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         sstables.remove(sstable);
     }
 
+    @Override
+    protected Set<SSTableReader> getSSTables()
+    {
+        return ImmutableSet.copyOf(sstables);
+    }
+
     public String toString()
     {
         return String.format("SizeTieredCompactionStrategy[%s/%s]",

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
index 595c46d..0f3c171 100644
--- a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
@@ -177,6 +177,12 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
         sstables.remove(sstable);
     }
 
+    @Override
+    protected Set<SSTableReader> getSSTables()
+    {
+        return ImmutableSet.copyOf(sstables);
+    }
+
     /**
      * Find the lowest and highest timestamps in a given timestamp/unit pair
      * Returns milliseconds, caller should adjust accordingly