You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2017/02/07 03:11:47 UTC

[6/7] cassandra git commit: Fix consistency of incrementally repaired data

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 34ec1dd..5a60ddd 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.OutputHandler;
 import org.apache.cassandra.utils.UUIDGen;
@@ -66,13 +67,14 @@ public class Upgrader
         this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
     }
 
-    private SSTableWriter createCompactionWriter(long repairedAt)
+    private SSTableWriter createCompactionWriter(long repairedAt, UUID parentRepair)
     {
         MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.getComparator());
         sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
         return SSTableWriter.create(cfs.newSSTableDescriptor(directory),
                                     estimatedRows,
                                     repairedAt,
+                                    parentRepair,
                                     cfs.metadata,
                                     sstableMetadataCollector,
                                     SerializationHeader.make(cfs.metadata(), Sets.newHashSet(sstable)),
@@ -88,7 +90,8 @@ public class Upgrader
              AbstractCompactionStrategy.ScannerList scanners = strategyManager.getScanners(transaction.originals());
              CompactionIterator iter = new CompactionIterator(transaction.opType(), scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID()))
         {
-            writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
+            StatsMetadata metadata = sstable.getSSTableMetadata();
+            writer.switchWriter(createCompactionWriter(metadata.repairedAt, metadata.pendingRepair));
             while (iter.hasNext())
                 writer.append(iter.next());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/Verifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index 467d50d..bca6e79 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -234,7 +234,7 @@ public class Verifier implements Closeable
 
     private void markAndThrow() throws IOException
     {
-        sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE);
+        sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().pendingRepair);
         throw new CorruptSSTableException(new Exception(String.format("Invalid SSTable %s, please force repair", sstable.getFilename())), sstable.getFilename());
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index 205aebe..e8f7d72 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,6 +56,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
     protected final long estimatedTotalKeys;
     protected final long maxAge;
     protected final long minRepairedAt;
+    protected final UUID pendingRepair;
 
     protected final SSTableRewriter sstableWriter;
     protected final LifecycleTransaction txn;
@@ -88,6 +90,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
         maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
         sstableWriter = SSTableRewriter.construct(cfs, txn, keepOriginals, maxAge);
         minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
+        pendingRepair = CompactionTask.getPendingRepair(nonExpiredSSTables);
         locations = cfs.getDirectories().getWriteableLocations();
         diskBoundaries = StorageService.getDiskBoundaries(cfs);
         locationIndex = -1;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index 4ffc747..cda7e38 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -71,6 +71,7 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
         SSTableWriter writer = SSTableWriter.create(cfs.newSSTableDescriptor(getDirectories().getLocationForDisk(directory)),
                                                     estimatedTotalKeys,
                                                     minRepairedAt,
+                                                    pendingRepair,
                                                     cfs.metadata,
                                                     new MetadataCollector(txn.originals(), cfs.metadata().comparator, sstableLevel),
                                                     SerializationHeader.make(cfs.metadata(), nonExpiredSSTables),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index c2d3a7d..3959b4b 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -107,6 +107,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
         sstableWriter.switchWriter(SSTableWriter.create(cfs.newSSTableDescriptor(getDirectories().getLocationForDisk(sstableDirectory)),
                 keysPerSSTable,
                 minRepairedAt,
+                pendingRepair,
                 cfs.metadata,
                 new MetadataCollector(txn.originals(), cfs.metadata().comparator, currentLevel),
                 SerializationHeader.make(cfs.metadata(), txn.originals()),
@@ -114,6 +115,5 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
                 txn));
         partitionsWritten = 0;
         sstablesWritten = 0;
-
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index eb05a23..c4f84e8 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -110,6 +110,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
         SSTableWriter writer = SSTableWriter.create(cfs.newSSTableDescriptor(getDirectories().getLocationForDisk(sstableDirectory)),
                                                     estimatedTotalKeys / estimatedSSTables,
                                                     minRepairedAt,
+                                                    pendingRepair,
                                                     cfs.metadata,
                                                     new MetadataCollector(allSSTables, cfs.metadata().comparator, level),
                                                     SerializationHeader.make(cfs.metadata(), nonExpiredSSTables),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index 3a3e805..a4af783 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -106,6 +106,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
         SSTableWriter writer = SSTableWriter.create(cfs.newSSTableDescriptor(getDirectories().getLocationForDisk(location)),
                                                     currentPartitionsToWrite,
                                                     minRepairedAt,
+                                                    pendingRepair,
                                                     cfs.metadata,
                                                     new MetadataCollector(allSSTables, cfs.metadata().comparator, 0),
                                                     SerializationHeader.make(cfs.metadata(), nonExpiredSSTables),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 46ca779..4d7c903 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -156,7 +156,7 @@ public class RangeStreamer
         this.address = address;
         this.description = description;
         this.streamPlan = new StreamPlan(description, ActiveRepairService.UNREPAIRED_SSTABLE, connectionsPerHost,
-                true, false, connectSequentially);
+                true, false, connectSequentially, null);
         this.useStrictConsistency = useStrictConsistency;
         this.snitch = snitch;
         this.stateStore = stateStore;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index b1e15ed..1fa5d8e 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -69,12 +69,13 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
         SerializationHeader header = new SerializationHeader(true, metadata.get(), columns, EncodingStats.NO_STATS);
 
         if (makeRangeAware)
-            return SSTableTxnWriter.createRangeAware(metadata, 0,  ActiveRepairService.UNREPAIRED_SSTABLE, formatType, 0, header);
+            return SSTableTxnWriter.createRangeAware(metadata, 0,  ActiveRepairService.UNREPAIRED_SSTABLE, ActiveRepairService.NO_PENDING_REPAIR, formatType, 0, header);
 
         return SSTableTxnWriter.create(metadata,
                                        createDescriptor(directory, metadata.keyspace, metadata.name, formatType),
                                        0,
                                        ActiveRepairService.UNREPAIRED_SSTABLE,
+                                       ActiveRepairService.NO_PENDING_REPAIR,
                                        0,
                                        header,
                                        Collections.emptySet());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 47b37ef..7e79fa9 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -159,7 +159,7 @@ public class SSTableLoader implements StreamEventHandler
         client.init(keyspace);
         outputHandler.output("Established connection to initial hosts");
 
-        StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, false, false, false).connectionFactory(client.getConnectionFactory());
+        StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, false, false, false, null).connectionFactory(client.getConnectionFactory());
 
         Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap();
         openSSTables(endpointToRanges);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
index 75797a9..60b8962 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.UUID;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
@@ -98,10 +99,10 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
     }
 
     @SuppressWarnings("resource") // log and writer closed during doPostCleanup
-    public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
+    public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, int sstableLevel, SerializationHeader header)
     {
         LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE);
-        SSTableMultiWriter writer = cfs.createSSTableMultiWriter(descriptor, keyCount, repairedAt, sstableLevel, header, txn);
+        SSTableMultiWriter writer = cfs.createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, sstableLevel, header, txn);
         return new SSTableTxnWriter(txn, writer);
     }
 
@@ -110,6 +111,7 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
     public static SSTableTxnWriter createRangeAware(TableMetadataRef metadata,
                                                     long keyCount,
                                                     long repairedAt,
+                                                    UUID pendingRepair,
                                                     SSTableFormat.Type type,
                                                     int sstableLevel,
                                                     SerializationHeader header)
@@ -120,7 +122,7 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
         SSTableMultiWriter writer;
         try
         {
-            writer = new RangeAwareSSTableWriter(cfs, keyCount, repairedAt, type, sstableLevel, 0, txn, header);
+            writer = new RangeAwareSSTableWriter(cfs, keyCount, repairedAt, pendingRepair, type, sstableLevel, 0, txn, header);
         }
         catch (IOException e)
         {
@@ -137,6 +139,7 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
                                           Descriptor descriptor,
                                           long keyCount,
                                           long repairedAt,
+                                          UUID pendingRepair,
                                           int sstableLevel,
                                           SerializationHeader header,
                                           Collection<Index> indexes)
@@ -144,12 +147,12 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
         // if the column family store does not exist, we create a new default SSTableMultiWriter to use:
         LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE);
         MetadataCollector collector = new MetadataCollector(metadata.get().comparator).sstableLevel(sstableLevel);
-        SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, metadata, collector, header, indexes, txn);
+        SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, pendingRepair, metadata, collector, header, indexes, txn);
         return new SSTableTxnWriter(txn, writer);
     }
 
-    public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor desc, long keyCount, long repairedAt, SerializationHeader header)
+    public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor desc, long keyCount, long repairedAt, UUID pendingRepair, SerializationHeader header)
     {
-        return create(cfs, desc, keyCount, repairedAt, 0, header);
+        return create(cfs, desc, keyCount, repairedAt, pendingRepair, 0, header);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
index 2d7d967..a40ec18 100644
--- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.io.sstable;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.UUID;
 
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
@@ -109,13 +110,14 @@ public class SimpleSSTableMultiWriter implements SSTableMultiWriter
     public static SSTableMultiWriter create(Descriptor descriptor,
                                             long keyCount,
                                             long repairedAt,
+                                            UUID pendingRepair,
                                             TableMetadataRef metadata,
                                             MetadataCollector metadataCollector,
                                             SerializationHeader header,
                                             Collection<Index> indexes,
                                             LifecycleTransaction txn)
     {
-        SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, indexes, txn);
+        SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, pendingRepair, metadata, metadataCollector, header, indexes, txn);
         return new SimpleSSTableMultiWriter(writer, txn);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
index 766a930..88c60e5 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.UUID;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
@@ -42,6 +43,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter
     private final int sstableLevel;
     private final long estimatedKeys;
     private final long repairedAt;
+    private final UUID pendingRepair;
     private final SSTableFormat.Type format;
     private final SerializationHeader header;
     private final LifecycleTransaction txn;
@@ -51,13 +53,14 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter
     private final List<SSTableReader> finishedReaders = new ArrayList<>();
     private SSTableMultiWriter currentWriter = null;
 
-    public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException
+    public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, UUID pendingRepair, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException
     {
         directories = cfs.getDirectories().getWriteableLocations();
         this.sstableLevel = sstableLevel;
         this.cfs = cfs;
         this.estimatedKeys = estimatedKeys / directories.length;
         this.repairedAt = repairedAt;
+        this.pendingRepair = pendingRepair;
         this.format = format;
         this.txn = txn;
         this.header = header;
@@ -69,7 +72,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter
                 throw new IOException(String.format("Insufficient disk space to store %s",
                                                     FBUtilities.prettyPrintMemory(totalSize)));
             Descriptor desc = cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(localDir), format);
-            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn);
+            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, pendingRepair, sstableLevel, header, txn);
         }
     }
 
@@ -91,7 +94,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter
                 finishedWriters.add(currentWriter);
 
             Descriptor desc = cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(directories[currentIndex]), format);
-            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn);
+            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, pendingRepair, sstableLevel, header, txn);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 87e12eb..716b27d 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -1751,6 +1751,17 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         return key;
     }
 
+    public boolean isPendingRepair()
+    {
+        return sstableMetadata.pendingRepair != ActiveRepairService.NO_PENDING_REPAIR;
+    }
+
+    public boolean intersects(Collection<Range<Token>> ranges)
+    {
+        Range<Token> range = new Range<>(first.getToken(), last.getToken());
+        return Iterables.any(ranges, r -> r.intersects(range));
+    }
+
     /**
      * TODO: Move someplace reusable
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 31354a0..1e183e2 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -54,6 +54,7 @@ import org.apache.cassandra.utils.concurrent.Transactional;
 public abstract class SSTableWriter extends SSTable implements Transactional
 {
     protected long repairedAt;
+    protected UUID pendingRepair;
     protected long maxDataAge = -1;
     protected final long keyCount;
     protected final MetadataCollector metadataCollector;
@@ -75,6 +76,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
     protected SSTableWriter(Descriptor descriptor,
                             long keyCount,
                             long repairedAt,
+                            UUID pendingRepair,
                             TableMetadataRef metadata,
                             MetadataCollector metadataCollector,
                             SerializationHeader header,
@@ -83,6 +85,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
         super(descriptor, components(metadata.get()), metadata, DatabaseDescriptor.getDiskOptimizationStrategy());
         this.keyCount = keyCount;
         this.repairedAt = repairedAt;
+        this.pendingRepair = pendingRepair;
         this.metadataCollector = metadataCollector;
         this.header = header;
         this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata.get(), descriptor.version, header);
@@ -92,6 +95,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
     public static SSTableWriter create(Descriptor descriptor,
                                        Long keyCount,
                                        Long repairedAt,
+                                       UUID pendingRepair,
                                        TableMetadataRef metadata,
                                        MetadataCollector metadataCollector,
                                        SerializationHeader header,
@@ -99,43 +103,46 @@ public abstract class SSTableWriter extends SSTable implements Transactional
                                        LifecycleTransaction txn)
     {
         Factory writerFactory = descriptor.getFormat().getWriterFactory();
-        return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers(descriptor, indexes, txn.opType()), txn);
+        return writerFactory.open(descriptor, keyCount, repairedAt, pendingRepair, metadata, metadataCollector, header, observers(descriptor, indexes, txn.opType()), txn);
     }
 
     public static SSTableWriter create(Descriptor descriptor,
                                        long keyCount,
                                        long repairedAt,
+                                       UUID pendingRepair,
                                        int sstableLevel,
                                        SerializationHeader header,
                                        Collection<Index> indexes,
                                        LifecycleTransaction txn)
     {
         TableMetadataRef metadata = Schema.instance.getTableMetadataRef(descriptor);
-        return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, indexes, txn);
+        return create(metadata, descriptor, keyCount, repairedAt, pendingRepair, sstableLevel, header, indexes, txn);
     }
 
     public static SSTableWriter create(TableMetadataRef metadata,
                                        Descriptor descriptor,
                                        long keyCount,
                                        long repairedAt,
+                                       UUID pendingRepair,
                                        int sstableLevel,
                                        SerializationHeader header,
                                        Collection<Index> indexes,
                                        LifecycleTransaction txn)
     {
         MetadataCollector collector = new MetadataCollector(metadata.get().comparator).sstableLevel(sstableLevel);
-        return create(descriptor, keyCount, repairedAt, metadata, collector, header, indexes, txn);
+        return create(descriptor, keyCount, repairedAt, pendingRepair, metadata, collector, header, indexes, txn);
     }
 
     @VisibleForTesting
     public static SSTableWriter create(Descriptor descriptor,
                                        long keyCount,
                                        long repairedAt,
+                                       UUID pendingRepair,
                                        SerializationHeader header,
                                        Collection<Index> indexes,
                                        LifecycleTransaction txn)
     {
-        return create(descriptor, keyCount, repairedAt, 0, header, indexes, txn);
+        return create(descriptor, keyCount, repairedAt, pendingRepair, 0, header, indexes, txn);
     }
 
     private static Set<Component> components(TableMetadata metadata)
@@ -301,6 +308,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
         return metadataCollector.finalizeMetadata(getPartitioner().getClass().getCanonicalName(),
                                                   metadata().params.bloomFilterFpChance,
                                                   repairedAt,
+                                                  pendingRepair,
                                                   header);
     }
 
@@ -329,6 +337,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
         public abstract SSTableWriter open(Descriptor descriptor,
                                            long keyCount,
                                            long repairedAt,
+                                           UUID pendingRepair,
                                            TableMetadataRef metadata,
                                            MetadataCollector metadataCollector,
                                            SerializationHeader header,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/format/Version.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/Version.java b/src/java/org/apache/cassandra/io/sstable/format/Version.java
index f900fc4..a07e48f 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/Version.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java
@@ -51,6 +51,8 @@ public abstract class Version
 
     public abstract boolean hasCommitLogIntervals();
 
+    public abstract boolean hasPendingRepair();
+
     public String getVersion()
     {
         return version;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index d949197..cad192a 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.io.sstable.format.big;
 
 import java.util.Collection;
 import java.util.Set;
+import java.util.UUID;
 
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableMetadataRef;
@@ -83,13 +84,14 @@ public class BigFormat implements SSTableFormat
         public SSTableWriter open(Descriptor descriptor,
                                   long keyCount,
                                   long repairedAt,
+                                  UUID pendingRepair,
                                   TableMetadataRef metadata,
                                   MetadataCollector metadataCollector,
                                   SerializationHeader header,
                                   Collection<SSTableFlushObserver> observers,
                                   LifecycleTransaction txn)
         {
-            return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers, txn);
+            return new BigTableWriter(descriptor, keyCount, repairedAt, pendingRepair, metadata, metadataCollector, header, observers, txn);
         }
     }
 
@@ -110,13 +112,14 @@ public class BigFormat implements SSTableFormat
     // we always incremented the major version.
     static class BigVersion extends Version
     {
-        public static final String current_version = "mc";
+        public static final String current_version = "md";
         public static final String earliest_supported_version = "ma";
 
         // ma (3.0.0): swap bf hash order
         //             store rows natively
         // mb (3.0.7, 3.7): commit log lower bound included
         // mc (3.0.8, 3.9): commit log intervals included
+        // md (3.0.9, 3.10): pending repair session included
         //
         // NOTE: when adding a new version, please add that to LegacySSTableTest, too.
 
@@ -124,6 +127,7 @@ public class BigFormat implements SSTableFormat
         public final int correspondingMessagingVersion;
         private final boolean hasCommitLogLowerBound;
         private final boolean hasCommitLogIntervals;
+        private final boolean hasPendingRepair;
 
         BigVersion(String version)
         {
@@ -134,6 +138,7 @@ public class BigFormat implements SSTableFormat
 
             hasCommitLogLowerBound = version.compareTo("mb") >= 0;
             hasCommitLogIntervals = version.compareTo("mc") >= 0;
+            hasPendingRepair = version.compareTo("md") >= 0;
         }
 
         @Override
@@ -154,6 +159,11 @@ public class BigFormat implements SSTableFormat
             return hasCommitLogIntervals;
         }
 
+        public boolean hasPendingRepair()
+        {
+            return hasPendingRepair;
+        }
+
         @Override
         public int correspondingMessagingVersion()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index e134f2d..4ae4331 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -17,13 +17,9 @@
  */
 package org.apache.cassandra.io.sstable.format.big;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
+import java.io.*;
 import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,13 +67,14 @@ public class BigTableWriter extends SSTableWriter
     public BigTableWriter(Descriptor descriptor,
                           long keyCount,
                           long repairedAt,
+                          UUID pendingRepair,
                           TableMetadataRef metadata,
                           MetadataCollector metadataCollector, 
                           SerializationHeader header,
                           Collection<SSTableFlushObserver> observers,
                           LifecycleTransaction txn)
     {
-        super(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers);
+        super(descriptor, keyCount, repairedAt, pendingRepair, metadata, metadataCollector, header, observers);
         txn.trackNew(this); // must track before any files are created
 
         if (compression)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
index 100cfdb..6a40d94 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.io.sstable.metadata;
 import java.io.IOException;
 import java.util.EnumSet;
 import java.util.Map;
+import java.util.UUID;
 
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.Version;
@@ -70,7 +71,7 @@ public interface IMetadataSerializer
     void mutateLevel(Descriptor descriptor, int newLevel) throws IOException;
 
     /**
-     * Mutate repairedAt time
+     * Mutate the repairedAt time and pendingRepair ID
      */
-    void mutateRepairedAt(Descriptor descriptor, long newRepairedAt) throws IOException;
+    void mutateRepaired(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 3b32ae2..6af93ad 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
 import com.clearspring.analytics.stream.cardinality.ICardinality;
@@ -80,7 +81,8 @@ public class MetadataCollector implements PartitionStatisticsCollector
                                  true,
                                  ActiveRepairService.UNREPAIRED_SSTABLE,
                                  -1,
-                                 -1);
+                                 -1,
+                                 null);
     }
 
     protected EstimatedHistogram estimatedPartitionSize = defaultPartitionSizeHistogram();
@@ -275,7 +277,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
         this.hasLegacyCounterShards = this.hasLegacyCounterShards || hasLegacyCounterShards;
     }
 
-    public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt, SerializationHeader header)
+    public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt, UUID pendingRepair, SerializationHeader header)
     {
         Map<MetadataType, MetadataComponent> components = new EnumMap<>(MetadataType.class);
         components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance));
@@ -296,7 +298,8 @@ public class MetadataCollector implements PartitionStatisticsCollector
                                                              hasLegacyCounterShards,
                                                              repairedAt,
                                                              totalColumnsSet,
-                                                             totalRows));
+                                                             totalRows,
+                                                             pendingRepair));
         components.put(MetadataType.COMPACTION, new CompactionMetadata(cardinality));
         components.put(MetadataType.HEADER, header.toComponent());
         return components;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
index e6e0953..2c1e0ec 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -136,13 +136,14 @@ public class MetadataSerializer implements IMetadataSerializer
         rewriteSSTableMetadata(descriptor, currentComponents);
     }
 
-    public void mutateRepairedAt(Descriptor descriptor, long newRepairedAt) throws IOException
+    public void mutateRepaired(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair) throws IOException
     {
-        logger.trace("Mutating {} to repairedAt time {}", descriptor.filenameFor(Component.STATS), newRepairedAt);
+        logger.trace("Mutating {} to repairedAt time {} and pendingRepair {}",
+                     descriptor.filenameFor(Component.STATS), newRepairedAt, newPendingRepair);
         Map<MetadataType, MetadataComponent> currentComponents = deserialize(descriptor, EnumSet.allOf(MetadataType.class));
         StatsMetadata stats = (StatsMetadata) currentComponents.remove(MetadataType.STATS);
-        // mutate level
-        currentComponents.put(MetadataType.STATS, stats.mutateRepairedAt(newRepairedAt));
+        // mutate time & id
+        currentComponents.put(MetadataType.STATS, stats.mutateRepairedAt(newRepairedAt).mutatePendingRepair(newPendingRepair));
         rewriteSSTableMetadata(descriptor, currentComponents);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index 0f6434b..fe5d7bb 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.sstable.format.Version;
@@ -34,6 +35,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.EstimatedHistogram;
 import org.apache.cassandra.utils.StreamingHistogram;
+import org.apache.cassandra.utils.UUIDSerializer;
 
 /**
  * SSTable metadata that always stay on heap.
@@ -61,6 +63,7 @@ public class StatsMetadata extends MetadataComponent
     public final long repairedAt;
     public final long totalColumnsSet;
     public final long totalRows;
+    public final UUID pendingRepair;
 
     public StatsMetadata(EstimatedHistogram estimatedPartitionSize,
                          EstimatedHistogram estimatedColumnCount,
@@ -79,7 +82,8 @@ public class StatsMetadata extends MetadataComponent
                          boolean hasLegacyCounterShards,
                          long repairedAt,
                          long totalColumnsSet,
-                         long totalRows)
+                         long totalRows,
+                         UUID pendingRepair)
     {
         this.estimatedPartitionSize = estimatedPartitionSize;
         this.estimatedColumnCount = estimatedColumnCount;
@@ -99,6 +103,7 @@ public class StatsMetadata extends MetadataComponent
         this.repairedAt = repairedAt;
         this.totalColumnsSet = totalColumnsSet;
         this.totalRows = totalRows;
+        this.pendingRepair = pendingRepair;
     }
 
     public MetadataType getType()
@@ -149,7 +154,8 @@ public class StatsMetadata extends MetadataComponent
                                  hasLegacyCounterShards,
                                  repairedAt,
                                  totalColumnsSet,
-                                 totalRows);
+                                 totalRows,
+                                 pendingRepair);
     }
 
     public StatsMetadata mutateRepairedAt(long newRepairedAt)
@@ -171,7 +177,31 @@ public class StatsMetadata extends MetadataComponent
                                  hasLegacyCounterShards,
                                  newRepairedAt,
                                  totalColumnsSet,
-                                 totalRows);
+                                 totalRows,
+                                 pendingRepair);
+    }
+
+    public StatsMetadata mutatePendingRepair(UUID newPendingRepair)
+    {
+        return new StatsMetadata(estimatedPartitionSize,
+                                 estimatedColumnCount,
+                                 commitLogIntervals,
+                                 minTimestamp,
+                                 maxTimestamp,
+                                 minLocalDeletionTime,
+                                 maxLocalDeletionTime,
+                                 minTTL,
+                                 maxTTL,
+                                 compressionRatio,
+                                 estimatedTombstoneDropTime,
+                                 sstableLevel,
+                                 minClusteringValues,
+                                 maxClusteringValues,
+                                 hasLegacyCounterShards,
+                                 repairedAt,
+                                 totalColumnsSet,
+                                 totalRows,
+                                 newPendingRepair);
     }
 
     @Override
@@ -200,6 +230,7 @@ public class StatsMetadata extends MetadataComponent
                        .append(hasLegacyCounterShards, that.hasLegacyCounterShards)
                        .append(totalColumnsSet, that.totalColumnsSet)
                        .append(totalRows, that.totalRows)
+                       .append(pendingRepair, that.pendingRepair)
                        .build();
     }
 
@@ -225,6 +256,7 @@ public class StatsMetadata extends MetadataComponent
                        .append(hasLegacyCounterShards)
                        .append(totalColumnsSet)
                        .append(totalRows)
+                       .append(pendingRepair)
                        .build();
     }
 
@@ -253,6 +285,13 @@ public class StatsMetadata extends MetadataComponent
                 size += CommitLogPosition.serializer.serializedSize(component.commitLogIntervals.lowerBound().orElse(CommitLogPosition.NONE));
             if (version.hasCommitLogIntervals())
                 size += commitLogPositionSetSerializer.serializedSize(component.commitLogIntervals);
+
+            if (version.hasPendingRepair())
+            {
+                size += 1;
+                if (component.pendingRepair != null)
+                    size += UUIDSerializer.serializer.serializedSize(component.pendingRepair, 0);
+            }
             return size;
         }
 
@@ -286,6 +325,19 @@ public class StatsMetadata extends MetadataComponent
                 CommitLogPosition.serializer.serialize(component.commitLogIntervals.lowerBound().orElse(CommitLogPosition.NONE), out);
             if (version.hasCommitLogIntervals())
                 commitLogPositionSetSerializer.serialize(component.commitLogIntervals, out);
+
+            if (version.hasPendingRepair())
+            {
+                if (component.pendingRepair != null)
+                {
+                    out.writeByte(1);
+                    UUIDSerializer.serializer.serialize(component.pendingRepair, out, 0);
+                }
+                else
+                {
+                    out.writeByte(0);
+                }
+            }
         }
 
         public StatsMetadata deserialize(Version version, DataInputPlus in) throws IOException
@@ -328,6 +380,12 @@ public class StatsMetadata extends MetadataComponent
             else
                 commitLogIntervals = new IntervalSet<CommitLogPosition>(commitLogLowerBound, commitLogUpperBound);
 
+            UUID pendingRepair = null;
+            if (version.hasPendingRepair() && in.readByte() != 0)
+            {
+                pendingRepair = UUIDSerializer.serializer.deserialize(in, 0);
+            }
+
             return new StatsMetadata(partitionSizes,
                                      columnCounts,
                                      commitLogIntervals,
@@ -345,7 +403,8 @@ public class StatsMetadata extends MetadataComponent
                                      hasLegacyCounterShards,
                                      repairedAt,
                                      totalColumnsSet,
-                                     totalRows);
+                                     totalRows,
+                                     pendingRepair);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index b97b836..19bf3d4 100644
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -73,7 +73,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable
             // The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing.
             // Note: we cannot use the same socket for incoming and outgoing streams because we want to
             // parallelize said streams and the socket is blocking, so we might deadlock.
-            StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, this, init.isForOutgoing, version, init.keepSSTableLevel, init.isIncremental);
+            StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, this, init.isForOutgoing, version, init.keepSSTableLevel, init.isIncremental, init.pendingRepair);
         }
         catch (Throwable t)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/AnticompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
deleted file mode 100644
index 6e6bb65..0000000
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.repair;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.util.concurrent.AbstractFuture;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.RequestFailureReason;
-import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.gms.EndpointState;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
-import org.apache.cassandra.gms.IFailureDetectionEventListener;
-import org.apache.cassandra.gms.VersionedValue;
-import org.apache.cassandra.net.IAsyncCallbackWithFailure;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.messages.AnticompactionRequest;
-import org.apache.cassandra.utils.CassandraVersion;
-
-public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable, IEndpointStateChangeSubscriber,
-                                                                               IFailureDetectionEventListener
-{
-    /*
-     * Version that anticompaction response is not supported up to.
-     * If Cassandra version is more than this, we need to wait for anticompaction response.
-     */
-    private static final CassandraVersion VERSION_CHECKER = new CassandraVersion("2.1.5");
-    private static Logger logger = LoggerFactory.getLogger(AnticompactionTask.class);
-
-    private final UUID parentSession;
-    private final InetAddress neighbor;
-    private final Collection<Range<Token>> successfulRanges;
-    private final AtomicBoolean isFinished = new AtomicBoolean(false);
-
-    public AnticompactionTask(UUID parentSession, InetAddress neighbor, Collection<Range<Token>> successfulRanges)
-    {
-        this.parentSession = parentSession;
-        this.neighbor = neighbor;
-        this.successfulRanges = successfulRanges;
-    }
-
-    public void run()
-    {
-        if (FailureDetector.instance.isAlive(neighbor))
-        {
-            AnticompactionRequest acr = new AnticompactionRequest(parentSession, successfulRanges);
-            CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbor);
-            if (peerVersion != null && peerVersion.compareTo(VERSION_CHECKER) > 0)
-            {
-                MessagingService.instance().sendRR(acr.createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true);
-            }
-            else
-            {
-                // immediately return after sending request
-                MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
-                maybeSetResult(neighbor);
-            }
-        }
-        else
-        {
-            maybeSetException(new IOException(neighbor + " is down"));
-        }
-    }
-
-    private boolean maybeSetException(Throwable t)
-    {
-        if (isFinished.compareAndSet(false, true))
-        {
-            setException(t);
-            return true;
-        }
-        return false;
-    }
-
-    private boolean maybeSetResult(InetAddress o)
-    {
-        if (isFinished.compareAndSet(false, true))
-        {
-            set(o);
-            return true;
-        }
-        return false;
-    }
-
-    /**
-     * Callback for antitcompaction request. Run on INTERNAL_RESPONSE stage.
-     */
-    public class AnticompactionCallback implements IAsyncCallbackWithFailure
-    {
-        final AnticompactionTask task;
-
-        public AnticompactionCallback(AnticompactionTask task)
-        {
-            this.task = task;
-        }
-
-        public void response(MessageIn msg)
-        {
-            maybeSetResult(msg.from);
-        }
-
-        public boolean isLatencyForSnitch()
-        {
-            return false;
-        }
-
-        public void onFailure(InetAddress from, RequestFailureReason failureReason)
-        {
-            maybeSetException(new RuntimeException("Anticompaction failed or timed out in " + from));
-        }
-    }
-
-    public void onJoin(InetAddress endpoint, EndpointState epState) {}
-    public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
-    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
-    public void onAlive(InetAddress endpoint, EndpointState state) {}
-    public void onDead(InetAddress endpoint, EndpointState state) {}
-
-    public void onRemove(InetAddress endpoint)
-    {
-        convict(endpoint, Double.MAX_VALUE);
-    }
-
-    public void onRestart(InetAddress endpoint, EndpointState epState)
-    {
-        convict(endpoint, Double.MAX_VALUE);
-    }
-
-    public void convict(InetAddress endpoint, double phi)
-    {
-        if (!neighbor.equals(endpoint))
-            return;
-
-        // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
-        if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
-            return;
-
-        Exception exception = new IOException(String.format("Endpoint %s died during anti-compaction.", endpoint));
-        if (maybeSetException(exception))
-        {
-            // Though unlikely, it is possible to arrive here multiple time and we want to avoid print an error message twice
-            logger.error("[repair #{}] Endpoint {} died during anti-compaction", endpoint, parentSession, exception);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index cfc181e..56411d9 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.repair;
 
 import java.net.InetAddress;
 import java.util.List;
+import java.util.UUID;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,13 +47,15 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
     private static final Logger logger = LoggerFactory.getLogger(LocalSyncTask.class);
 
     private final long repairedAt;
+    private final UUID pendingRepair;
 
     private final boolean pullRepair;
 
-    public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, long repairedAt, boolean pullRepair)
+    public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, long repairedAt, UUID pendingRepair, boolean pullRepair)
     {
         super(desc, r1, r2);
         this.repairedAt = repairedAt;
+        this.pendingRepair = pendingRepair;
         this.pullRepair = pullRepair;
     }
 
@@ -76,7 +79,7 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
             isIncremental = prs.isIncremental;
         }
         Tracing.traceRepair(message);
-        StreamPlan plan = new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false).listeners(this)
+        StreamPlan plan = new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false, pendingRepair).listeners(this)
                                             .flushBeforeTransfer(true)
                                             // request ranges from the remote node
                                             .requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 7fc7816..07bc1e2 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -42,6 +42,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
     private final RepairParallelism parallelismDegree;
     private final long repairedAt;
     private final ListeningExecutorService taskExecutor;
+    private final boolean isConsistent;
 
     /**
      * Create repair job to run on specific columnfamily
@@ -49,13 +50,14 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
      * @param session RepairSession that this RepairJob belongs
      * @param columnFamily name of the ColumnFamily to repair
      */
-    public RepairJob(RepairSession session, String columnFamily)
+    public RepairJob(RepairSession session, String columnFamily, boolean isConsistent)
     {
         this.session = session;
         this.desc = new RepairJobDesc(session.parentRepairSession, session.getId(), session.keyspace, columnFamily, session.getRanges());
         this.repairedAt = session.repairedAt;
         this.taskExecutor = session.taskExecutor;
         this.parallelismDegree = session.parallelismDegree;
+        this.isConsistent = isConsistent;
     }
 
     /**
@@ -73,16 +75,26 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
         // Create a snapshot at all nodes unless we're using pure parallel repairs
         if (parallelismDegree != RepairParallelism.PARALLEL)
         {
-            // Request snapshot to all replica
-            List<ListenableFuture<InetAddress>> snapshotTasks = new ArrayList<>(allEndpoints.size());
-            for (InetAddress endpoint : allEndpoints)
+            ListenableFuture<List<InetAddress>> allSnapshotTasks;
+            if (isConsistent)
             {
-                SnapshotTask snapshotTask = new SnapshotTask(desc, endpoint);
-                snapshotTasks.add(snapshotTask);
-                taskExecutor.execute(snapshotTask);
+                // consistent repair does it's own "snapshotting"
+                allSnapshotTasks = Futures.immediateFuture(allEndpoints);
             }
+            else
+            {
+                // Request snapshot to all replica
+                List<ListenableFuture<InetAddress>> snapshotTasks = new ArrayList<>(allEndpoints.size());
+                for (InetAddress endpoint : allEndpoints)
+                {
+                    SnapshotTask snapshotTask = new SnapshotTask(desc, endpoint);
+                    snapshotTasks.add(snapshotTask);
+                    taskExecutor.execute(snapshotTask);
+                }
+                allSnapshotTasks = Futures.allAsList(snapshotTasks);
+            }
+
             // When all snapshot complete, send validation requests
-            ListenableFuture<List<InetAddress>> allSnapshotTasks = Futures.allAsList(snapshotTasks);
             validations = Futures.transform(allSnapshotTasks, new AsyncFunction<List<InetAddress>, List<TreeResponse>>()
             {
                 public ListenableFuture<List<TreeResponse>> apply(List<InetAddress> endpoints)
@@ -118,7 +130,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
                         SyncTask task;
                         if (r1.endpoint.equals(local) || r2.endpoint.equals(local))
                         {
-                            task = new LocalSyncTask(desc, r1, r2, repairedAt, session.pullRepair);
+                            task = new LocalSyncTask(desc, r1, r2, repairedAt, isConsistent ? desc.parentSessionId : null, session.pullRepair);
                         }
                         else
                         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98d74ed9/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index d7736f0..4f412f0 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -21,8 +21,6 @@ import java.net.InetAddress;
 import java.util.*;
 
 import com.google.common.base.Predicate;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +44,12 @@ import org.apache.cassandra.service.ActiveRepairService;
 public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
 {
     private static final Logger logger = LoggerFactory.getLogger(RepairMessageVerbHandler.class);
+
+    private boolean isConsistent(UUID sessionID)
+    {
+        return ActiveRepairService.instance.consistent.local.isSessionInProgress(sessionID);
+    }
+
     public void doVerb(final MessageIn<RepairMessage> message, final int id)
     {
         // TODO add cancel/interrupt message
@@ -122,7 +126,8 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                         return;
                     }
 
-                    Validator validator = new Validator(desc, message.from, validationRequest.gcBefore);
+                    ActiveRepairService.instance.consistent.local.maybeSetRepairing(desc.parentSessionId);
+                    Validator validator = new Validator(desc, message.from, validationRequest.gcBefore, isConsistent(desc.parentSessionId));
                     CompactionManager.instance.submitValidation(store, validator);
                     break;
 
@@ -134,24 +139,10 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                     if (desc.parentSessionId != null && ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) != null)
                         repairedAt = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).getRepairedAt();
 
-                    StreamingRepairTask task = new StreamingRepairTask(desc, request, repairedAt);
+                    StreamingRepairTask task = new StreamingRepairTask(desc, request, repairedAt, isConsistent(desc.parentSessionId));
                     task.run();
                     break;
 
-                case ANTICOMPACTION_REQUEST:
-                    AnticompactionRequest anticompactionRequest = (AnticompactionRequest) message.payload;
-                    logger.debug("Got anticompaction request {}", anticompactionRequest);
-                    ListenableFuture<?> compactionDone = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession, anticompactionRequest.successfulRanges);
-                    compactionDone.addListener(new Runnable()
-                    {
-                        @Override
-                        public void run()
-                        {
-                            MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
-                        }
-                    }, MoreExecutors.directExecutor());
-                    break;
-
                 case CLEANUP:
                     logger.debug("cleaning up repair");
                     CleanupMessage cleanup = (CleanupMessage) message.payload;
@@ -159,6 +150,40 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                     MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
                     break;
 
+                case CONSISTENT_REQUEST:
+                    ActiveRepairService.instance.consistent.local.handlePrepareMessage(message.from, (PrepareConsistentRequest) message.payload);
+                    break;
+
+                case CONSISTENT_RESPONSE:
+                    ActiveRepairService.instance.consistent.coordinated.handlePrepareResponse((PrepareConsistentResponse) message.payload);
+                    break;
+
+                case FINALIZE_PROPOSE:
+                    ActiveRepairService.instance.consistent.local.handleFinalizeProposeMessage(message.from, (FinalizePropose) message.payload);
+                    break;
+
+                case FINALIZE_PROMISE:
+                    ActiveRepairService.instance.consistent.coordinated.handleFinalizePromiseMessage((FinalizePromise) message.payload);
+                    break;
+
+                case FINALIZE_COMMIT:
+                    ActiveRepairService.instance.consistent.local.handleFinalizeCommitMessage(message.from, (FinalizeCommit) message.payload);
+                    break;
+
+                case FAILED_SESSION:
+                    FailSession failure = (FailSession) message.payload;
+                    ActiveRepairService.instance.consistent.coordinated.handleFailSessionMessage(failure);
+                    ActiveRepairService.instance.consistent.local.handleFailSessionMessage(message.from, failure);
+                    break;
+
+                case STATUS_REQUEST:
+                    ActiveRepairService.instance.consistent.local.handleStatusRequest(message.from, (StatusRequest) message.payload);
+                    break;
+
+                case STATUS_RESPONSE:
+                    ActiveRepairService.instance.consistent.local.handleStatusResponse(message.from, (StatusResponse) message.payload);
+                    break;
+
                 default:
                     ActiveRepairService.instance.handleMessage(message.from, message.payload);
                     break;