You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/02/08 08:48:57 UTC

[1/3] Avoid repairing already repaired data.

Updated Branches:
  refs/heads/trunk bcfaeaa9c -> a7b72140b


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 38c6d56..8eb9199 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -24,6 +24,8 @@ import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.UUID;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
@@ -36,25 +38,42 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.repair.RepairJobDesc;
 import org.apache.cassandra.repair.Validator;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(OrderedJUnit4ClassRunner.class)
 public class LeveledCompactionStrategyTest extends SchemaLoader
 {
+    private String ksname = "Keyspace1";
+    private String cfname = "StandardLeveled";
+    private Keyspace keyspace = Keyspace.open(ksname);
+    private ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+
+    @Before
+    public void enableCompaction()
+    {
+        cfs.enableAutoCompaction();
+    }
+
+    /**
+     * Since we use StandardLeveled CF for every test, we want to clean up after the test.
+     */
+    @After
+    public void truncateSTandardLeveled()
+    {
+        cfs.truncateBlocking();
+    }
+
     /*
      * This exercises in particular the code of #4142
      */
     @Test
     public void testValidationMultipleSSTablePerLevel() throws Exception
     {
-        String ksname = "Keyspace1";
-        String cfname = "StandardLeveled";
-        Keyspace keyspace = Keyspace.open(ksname);
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
-
         ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files
 
         // Enough data to have a level 1 and 2
@@ -80,9 +99,11 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
         assert strategy.getLevelSize(1) > 0;
         assert strategy.getLevelSize(2) > 0;
 
-        Range<Token> range = new Range<Token>(Util.token(""), Util.token(""));
+        Range<Token> range = new Range<>(Util.token(""), Util.token(""));
         int gcBefore = keyspace.getColumnFamilyStore(cfname).gcBefore(System.currentTimeMillis());
-        RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), ksname, cfname, range);
+        UUID parentRepSession = UUID.randomUUID();
+        ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range));
+        RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), ksname, cfname, range);
         Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore);
         CompactionManager.instance.submitValidation(cfs, validator).get();
     }
@@ -101,11 +122,6 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
     @Test
     public void testCompactionProgress() throws Exception
     {
-        String ksname = "Keyspace1";
-        String cfname = "StandardLeveled";
-        Keyspace keyspace = Keyspace.open(ksname);
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
-
         // make sure we have SSTables in L1
         ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]);
         int rows = 2;
@@ -142,11 +158,6 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
     @Test
     public void testMutateLevel() throws Exception
     {
-        String ksname = "Keyspace1";
-        String cfname = "StandardLeveled";
-        Keyspace keyspace = Keyspace.open(ksname);
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
-
         ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files
 
         // Enough data to have a level 1 and 2
@@ -189,4 +200,89 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
         // verify that the manifest has correct amount of sstables
         assertEquals(cfs.getSSTables().size(), levels[6]);
     }
+
+    @Test
+    public void testNewRepairedSSTable() throws Exception
+    {
+        ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files
+
+        // Enough data to have a level 1 and 2
+        int rows = 20;
+        int columns = 10;
+
+        // Adds enough data to trigger multiple sstable per level
+        for (int r = 0; r < rows; r++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(r));
+            Mutation rm = new Mutation(ksname, key.key);
+            for (int c = 0; c < columns; c++)
+            {
+                rm.add(cfname, Util.cellname("column" + c), value, 0);
+            }
+            rm.apply();
+            cfs.forceBlockingFlush();
+        }
+        waitForLeveling(cfs);
+        cfs.disableAutoCompaction();
+
+        while(CompactionManager.instance.isCompacting(Arrays.asList(cfs)))
+            Thread.sleep(100);
+
+        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) cfs.getCompactionStrategy();
+        assertTrue(strategy.getLevelSize(1) > 0);
+        assertTrue(strategy.getLevelSize(2) > 0);
+
+        for (SSTableReader sstable : cfs.getSSTables())
+        {
+            assertFalse(sstable.isRepaired());
+        }
+        int sstableCount = 0;
+        for (List<SSTableReader> level : strategy.manifest.generations)
+            sstableCount += level.size();
+
+        assertEquals(sstableCount, cfs.getSSTables().size());
+
+        assertFalse(strategy.manifest.hasRepairedData());
+        assertTrue(strategy.manifest.unrepairedL0.size() == 0);
+
+        SSTableReader sstable1 = strategy.manifest.generations[2].get(0);
+        SSTableReader sstable2 = strategy.manifest.generations[1].get(0);
+
+        // "repair" an sstable:
+        strategy.manifest.remove(sstable1);
+        sstable1.descriptor.getMetadataSerializer().mutateRepairedAt(sstable1.descriptor, System.currentTimeMillis());
+        sstable1.reloadSSTableMetadata();
+        assertTrue(sstable1.isRepaired());
+
+        // make sure adding a repaired sstable makes the manifest contain only repaired data;
+        strategy.manifest.add(sstable1);
+        assertTrue(strategy.manifest.hasRepairedData());
+        assertTrue(strategy.manifest.generations[2].contains(sstable1));
+        assertFalse(strategy.manifest.generations[1].contains(sstable2));
+        assertTrue(strategy.manifest.unrepairedL0.contains(sstable2));
+        sstableCount = 0;
+        for (int i = 0; i < strategy.manifest.generations.length; i++)
+        {
+            sstableCount += strategy.manifest.generations[i].size();
+            if (i != 2)
+                assertEquals(strategy.manifest.generations[i].size(), 0);
+            else
+                assertEquals(strategy.manifest.generations[i].size(), 1);
+        }
+        assertEquals(1, sstableCount);
+
+        // make sure adding an unrepaired sstable puts it in unrepairedL0:
+        strategy.manifest.remove(sstable2);
+        strategy.manifest.add(sstable2);
+        assertTrue(strategy.manifest.unrepairedL0.contains(sstable2));
+        assertEquals(strategy.manifest.unrepairedL0.size(), cfs.getSSTables().size() - 1);
+
+        // make sure repairing an sstable takes it away from unrepairedL0 and puts it in the correct level:
+        strategy.manifest.remove(sstable2);
+        sstable2.descriptor.getMetadataSerializer().mutateRepairedAt(sstable2.descriptor, System.currentTimeMillis());
+        sstable2.reloadSSTableMetadata();
+        strategy.manifest.add(sstable2);
+        assertFalse(strategy.manifest.unrepairedL0.contains(sstable2));
+        assertTrue(strategy.manifest.generations[1].contains(sstable2));
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index ab6506d..ae4e12b 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -112,7 +112,7 @@ public class LegacySSTableTest extends SchemaLoader
         ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>();
         details.add(new StreamSession.SSTableStreamingSections(sstable,
                                                                sstable.getPositionsForRanges(ranges),
-                                                               sstable.estimatedKeysForRanges(ranges)));
+                                                               sstable.estimatedKeysForRanges(ranges), sstable.getSSTableMetadata().repairedAt));
         new StreamPlan("LegacyStreamingTest").transferFiles(FBUtilities.getBroadcastAddress(), details)
                                              .execute().get();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 561b36c..b25e49c 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -25,6 +25,7 @@ import java.util.*;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import org.apache.cassandra.Util;
@@ -198,7 +199,7 @@ public class SSTableUtils
         public SSTableReader write(int expectedSize, Appender appender) throws IOException
         {
             File datafile = (dest == null) ? tempSSTableFile(ksname, cfname, generation) : new File(dest.filenameFor(Component.DATA));
-            SSTableWriter writer = new SSTableWriter(datafile.getAbsolutePath(), expectedSize);
+            SSTableWriter writer = new SSTableWriter(datafile.getAbsolutePath(), expectedSize, ActiveRepairService.UNREPAIRED_SSTABLE);
             while (appender.append(writer)) { /* pass */ }
             SSTableReader reader = writer.closeAndOpenReader();
             // mark all components for removal

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
index 6793c85..6b66746 100644
--- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
@@ -65,7 +65,7 @@ public class MetadataSerializerTest
 
         String partitioner = RandomPartitioner.class.getCanonicalName();
         double bfFpChance = 0.1;
-        Map<MetadataType, MetadataComponent> originalMetadata = collector.finalizeMetadata(partitioner, bfFpChance);
+        Map<MetadataType, MetadataComponent> originalMetadata = collector.finalizeMetadata(partitioner, bfFpChance, 0);
 
         MetadataSerializer serializer = new MetadataSerializer();
         // Serialize to tmp file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/test/unit/org/apache/cassandra/repair/DifferencerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/DifferencerTest.java b/test/unit/org/apache/cassandra/repair/DifferencerTest.java
index b6dce40..7adbb06 100644
--- a/test/unit/org/apache/cassandra/repair/DifferencerTest.java
+++ b/test/unit/org/apache/cassandra/repair/DifferencerTest.java
@@ -18,6 +18,8 @@
 package org.apache.cassandra.repair;
 
 import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
@@ -26,13 +28,17 @@ import org.junit.After;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.sink.IMessageSink;
 import org.apache.cassandra.sink.SinkManager;
 import org.apache.cassandra.repair.messages.RepairMessage;
@@ -82,7 +88,7 @@ public class DifferencerTest extends SchemaLoader
             }
         });
         Range<Token> range = new Range<>(partirioner.getMinimumToken(), partirioner.getRandomToken());
-        RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), "Keyspace1", "Standard1", range);
+        RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), "Keyspace1", "Standard1", range);
 
         MerkleTree tree1 = createInitialTree(desc);
         MerkleTree tree2 = createInitialTree(desc);
@@ -101,7 +107,13 @@ public class DifferencerTest extends SchemaLoader
     public void testDifference() throws Throwable
     {
         Range<Token> range = new Range<>(partirioner.getMinimumToken(), partirioner.getRandomToken());
-        RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), "Keyspace1", "Standard1", range);
+        UUID parentRepairSession = UUID.randomUUID();
+        Keyspace keyspace = Keyspace.open("Keyspace1");
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
+
+        ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range));
+
+        RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), "Keyspace1", "Standard1", range);
 
         MerkleTree tree1 = createInitialTree(desc);
         MerkleTree tree2 = createInitialTree(desc);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index 757254c..05936e2 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -65,7 +65,7 @@ public class ValidatorTest extends SchemaLoader
     public void testValidatorComplete() throws Throwable
     {
         Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
-        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), keyspace, columnFamily, range);
+        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
 
         final SimpleCondition lock = new SimpleCondition();
         SinkManager.add(new IMessageSink()
@@ -146,7 +146,7 @@ public class ValidatorTest extends SchemaLoader
     public void testValidatorFailed() throws Throwable
     {
         Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
-        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), keyspace, columnFamily, range);
+        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
 
         final SimpleCondition lock = new SimpleCondition();
         SinkManager.add(new IMessageSink()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index 78ae1c1..6e485fa 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -102,7 +102,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
 
         local_range = StorageService.instance.getPrimaryRangesForEndpoint(keyspaceName, LOCAL).iterator().next();
 
-        desc = new RepairJobDesc(UUID.randomUUID(), keyspaceName, cfname, local_range);
+        desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspaceName, cfname, local_range);
         // Set a fake session corresponding to this fake request
         ActiveRepairService.instance.submitArtificialRepairSession(desc);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/test/unit/org/apache/cassandra/service/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java
index b47f4d8..f1d23f0 100644
--- a/test/unit/org/apache/cassandra/service/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.NodePair;
 import org.apache.cassandra.repair.RepairJobDesc;
 import org.apache.cassandra.repair.Validator;
@@ -49,7 +50,7 @@ public class SerializationsTest extends AbstractSerializationsTester
 
     private static final UUID RANDOM_UUID = UUID.fromString("b5c3d033-75aa-4c2f-a819-947aac7a0c54");
     private static final Range<Token> FULL_RANGE = new Range<>(StorageService.getPartitioner().getMinimumToken(), StorageService.getPartitioner().getMinimumToken());
-    private static final RepairJobDesc DESC = new RepairJobDesc(RANDOM_UUID, "Keyspace1", "Standard1", FULL_RANGE);
+    private static final RepairJobDesc DESC = new RepairJobDesc(getVersion() < MessagingService.VERSION_21 ? null : RANDOM_UUID, RANDOM_UUID, "Keyspace1", "Standard1", FULL_RANGE);
 
     private void testRepairMessageWrite(String fileName, RepairMessage... messages) throws IOException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index c7e7ee7..e0a89c3 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -213,7 +213,7 @@ public class StreamingTransferTest extends SchemaLoader
         {
             details.add(new StreamSession.SSTableStreamingSections(sstable,
                                                                    sstable.getPositionsForRanges(ranges),
-                                                                   sstable.estimatedKeysForRanges(ranges)));
+                                                                   sstable.estimatedKeysForRanges(ranges), sstable.getSSTableMetadata().repairedAt));
         }
         return details;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/SSTableExportTest.java b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
index 5f0fb5c..df7a988 100644
--- a/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
+++ b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
@@ -56,7 +57,7 @@ public class SSTableExportTest extends SchemaLoader
     {
         File tempSS = tempSSTableFile("Keyspace1", "Standard1");
         ColumnFamily cfamily = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Standard1");
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE);
 
         // Add rowA
         cfamily.addColumn(Util.cellname("colA"), ByteBufferUtil.bytes("valA"), System.currentTimeMillis());
@@ -82,7 +83,7 @@ public class SSTableExportTest extends SchemaLoader
     {
         File tempSS = tempSSTableFile("Keyspace1", "Standard1");
         ColumnFamily cfamily = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Standard1");
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE);
 
         // Add rowA
         cfamily.addColumn(Util.cellname("colA"), ByteBufferUtil.bytes("valA"), System.currentTimeMillis());
@@ -117,7 +118,7 @@ public class SSTableExportTest extends SchemaLoader
     {
         File tempSS = tempSSTableFile("Keyspace1", "Standard1");
         ColumnFamily cfamily = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Standard1");
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE);
 
         int nowInSec = (int)(System.currentTimeMillis() / 1000) + 42; //live for 42 seconds
         // Add rowA
@@ -173,7 +174,7 @@ public class SSTableExportTest extends SchemaLoader
         ColumnFamilyStore cfs = Keyspace.open("Keyspace1").getColumnFamilyStore("Standard1");
         File tempSS = tempSSTableFile("Keyspace1", "Standard1");
         ColumnFamily cfamily = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Standard1");
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE);
 
         // Add rowA
         cfamily.addColumn(Util.cellname("name"), ByteBufferUtil.bytes("val"), System.currentTimeMillis());
@@ -212,7 +213,7 @@ public class SSTableExportTest extends SchemaLoader
     {
         File tempSS = tempSSTableFile("Keyspace1", "Counter1");
         ColumnFamily cfamily = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Counter1");
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE);
 
         // Add rowA
         cfamily.addColumn(CounterCell.createLocal(Util.cellname("colA"), 42, System.currentTimeMillis(), Long.MIN_VALUE));
@@ -243,7 +244,7 @@ public class SSTableExportTest extends SchemaLoader
     {
         File tempSS = tempSSTableFile("Keyspace1", "ValuesWithQuotes");
         ColumnFamily cfamily = TreeMapBackedSortedColumns.factory.create("Keyspace1", "ValuesWithQuotes");
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE);
 
         // Add rowA
         cfamily.addColumn(new Cell(Util.cellname("data"), UTF8Type.instance.fromString("{\"foo\":\"bar\"}")));
@@ -275,7 +276,7 @@ public class SSTableExportTest extends SchemaLoader
 
         File tempSS = tempSSTableFile("Keyspace1", "Standard1");
         ColumnFamily cfamily = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Standard1");
-        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
+        SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE);
 
         // Add rowA
         cfamily.addColumn(Util.cellname("colName"), ByteBufferUtil.bytes("val"), System.currentTimeMillis());


[2/3] Avoid repairing already repaired data.

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index acc8aab..bb66b69 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -17,15 +17,30 @@
  */
 package org.apache.cassandra.repair;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Future;
+
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.messages.AnticompactionRequest;
+import org.apache.cassandra.repair.messages.PrepareMessage;
 import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.SyncRequest;
 import org.apache.cassandra.repair.messages.ValidationRequest;
 import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Handles all repair related message.
@@ -34,16 +49,33 @@ import org.apache.cassandra.service.ActiveRepairService;
  */
 public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
 {
+    private static final Logger logger = LoggerFactory.getLogger(RepairMessageVerbHandler.class);
     public void doVerb(MessageIn<RepairMessage> message, int id)
     {
         // TODO add cancel/interrupt message
         RepairJobDesc desc = message.payload.desc;
         switch (message.payload.messageType)
         {
+            case PREPARE_MESSAGE:
+                PrepareMessage prepareMessage = (PrepareMessage) message.payload;
+                List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(prepareMessage.cfIds.size());
+                for (UUID cfId : prepareMessage.cfIds)
+                {
+                    Pair<String, String> kscf = Schema.instance.getCF(cfId);
+                    ColumnFamilyStore columnFamilyStore = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+                    columnFamilyStores.add(columnFamilyStore);
+                }
+                ActiveRepairService.instance.registerParentRepairSession(prepareMessage.parentRepairSession,
+                                                                         columnFamilyStores,
+                                                                         prepareMessage.ranges);
+                MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
+                break;
+
             case VALIDATION_REQUEST:
                 ValidationRequest validationRequest = (ValidationRequest) message.payload;
                 // trigger read-only compaction
                 ColumnFamilyStore store = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily);
+
                 Validator validator = new Validator(desc, message.from, validationRequest.gcBefore);
                 CompactionManager.instance.submitValidation(store, validator);
                 break;
@@ -55,6 +87,21 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                 task.run();
                 break;
 
+            case ANTICOMPACTION_REQUEST:
+                logger.debug("Got anticompaction request");
+                AnticompactionRequest anticompactionRequest = (AnticompactionRequest) message.payload;
+                try
+                {
+                    List<Future<?>> futures = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession);
+                    FBUtilities.waitOnFutures(futures);
+                }
+                catch (Exception e)
+                {
+                    throw new RuntimeException(e);
+                }
+
+                break;
+
             default:
                 ActiveRepairService.instance.handleMessage(message.from, message.payload);
                 break;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 3933a88..75d5209 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -93,6 +93,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
 
     private final SimpleCondition completed = new SimpleCondition();
     public final Condition differencingDone = new SimpleCondition();
+    public final UUID parentRepairSession;
 
     private volatile boolean terminated = false;
 
@@ -102,23 +103,24 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
      * @param range range to repair
      * @param keyspace name of keyspace
      * @param isSequential true if performing repair on snapshots sequentially
-     * @param dataCenters the data centers that should be part of the repair; null for all DCs
+     * @param endpoints the data centers that should be part of the repair; null for all DCs
      * @param cfnames names of columnfamilies
      */
-    public RepairSession(Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, String... cfnames)
+    public RepairSession(UUID parentRepairSession, Range<Token> range, String keyspace, boolean isSequential, Set<InetAddress> endpoints, String... cfnames)
     {
-        this(UUIDGen.getTimeUUID(), range, keyspace, isSequential, dataCenters, cfnames);
+        this(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, isSequential, endpoints, cfnames);
     }
 
-    public RepairSession(UUID id, Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, String[] cfnames)
+    public RepairSession(UUID parentRepairSession, UUID id, Range<Token> range, String keyspace, boolean isSequential, Set<InetAddress> endpoints, String[] cfnames)
     {
+        this.parentRepairSession = parentRepairSession;
         this.id = id;
         this.isSequential = isSequential;
         this.keyspace = keyspace;
         this.cfnames = cfnames;
         assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it";
         this.range = range;
-        this.endpoints = ActiveRepairService.getNeighbors(keyspace, range, dataCenters);
+        this.endpoints = endpoints;
     }
 
     public UUID getId()
@@ -260,15 +262,16 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
             // Create and queue a RepairJob for each column family
             for (String cfname : cfnames)
             {
-                RepairJob job = new RepairJob(id, keyspace, cfname, range, isSequential);
+                RepairJob job = new RepairJob(parentRepairSession, id, keyspace, cfname, range, isSequential);
                 jobs.offer(job);
             }
-
+            logger.debug("Sending tree requests to endpoints {}", endpoints);
             jobs.peek().sendTreeRequests(endpoints);
 
             // block whatever thread started this session until all requests have been returned:
             // if this thread dies, the session will still complete in the background
             completed.await();
+
             if (exception == null)
             {
                 logger.info(String.format("[repair #%s] session completed successfully", getId()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 1fd2b4f..636568c 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.SyncComplete;
 import org.apache.cassandra.repair.messages.SyncRequest;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -65,8 +66,12 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
 
     private void initiateStreaming()
     {
+        long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE;
+        if (desc.parentSessionId != null && ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) != null)
+            repairedAt = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).repairedAt;
+
         logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", desc.sessionId, request.ranges.size(), request.dst));
-        StreamResultFuture op = new StreamPlan("Repair")
+        StreamResultFuture op = new StreamPlan("Repair", repairedAt)
                                     .flushBeforeTransfer(true)
                                     // request ranges from the remote node
                                     .requestRanges(request.dst, desc.keyspace, request.ranges, desc.columnFamily)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
index f546410..b195852 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
@@ -42,7 +42,9 @@ public abstract class RepairMessage
         VALIDATION_REQUEST(0, ValidationRequest.serializer),
         VALIDATION_COMPLETE(1, ValidationComplete.serializer),
         SYNC_REQUEST(2, SyncRequest.serializer),
-        SYNC_COMPLETE(3, SyncComplete.serializer);
+        SYNC_COMPLETE(3, SyncComplete.serializer),
+        ANTICOMPACTION_REQUEST(4, AnticompactionRequest.serializer),
+        PREPARE_MESSAGE(5, PrepareMessage.serializer);
 
         private final byte type;
         private final MessageSerializer<RepairMessage> serializer;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index b77f216..dc4c66a 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.service;
 
+import java.io.File;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.*;
@@ -24,19 +26,34 @@ import java.util.concurrent.*;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.*;
+import org.apache.cassandra.repair.messages.AnticompactionRequest;
+import org.apache.cassandra.repair.messages.PrepareMessage;
 import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.SyncComplete;
 import org.apache.cassandra.repair.messages.ValidationComplete;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
 
 /**
  * ActiveRepairService is the starting point for manual "active" repairs.
@@ -54,9 +71,12 @@ import org.apache.cassandra.utils.FBUtilities;
  */
 public class ActiveRepairService
 {
+    private static final Logger logger = LoggerFactory.getLogger(ActiveRepairService.class);
     // singleton enforcement
     public static final ActiveRepairService instance = new ActiveRepairService();
 
+    public static final long UNREPAIRED_SSTABLE = 0;
+
     private static final ThreadPoolExecutor executor;
     static
     {
@@ -74,16 +94,20 @@ public class ActiveRepairService
     }
 
     /**
-     * A map of active session.
+     * A map of active coordinator session.
      */
     private final ConcurrentMap<UUID, RepairSession> sessions;
 
+    private final ConcurrentMap<UUID, ParentRepairSession> parentRepairSessions;
+
+    private CountDownLatch prepareLatch = null;
     /**
      * Protected constructor. Use ActiveRepairService.instance.
      */
     protected ActiveRepairService()
     {
         sessions = new ConcurrentHashMap<>();
+        parentRepairSessions = new ConcurrentHashMap<>();
     }
 
     /**
@@ -91,9 +115,9 @@ public class ActiveRepairService
      *
      * @return Future for asynchronous call or null if there is no need to repair
      */
-    public RepairFuture submitRepairSession(Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, String... cfnames)
+    public RepairFuture submitRepairSession(UUID parentRepairSession, Range<Token> range, String keyspace, boolean isSequential, Set<InetAddress> endpoints, String... cfnames)
     {
-        RepairSession session = new RepairSession(range, keyspace, isSequential, dataCenters, cfnames);
+        RepairSession session = new RepairSession(parentRepairSession, range, keyspace, isSequential, endpoints, cfnames);
         if (session.endpoints.isEmpty())
             return null;
         RepairFuture futureTask = new RepairFuture(session);
@@ -121,13 +145,16 @@ public class ActiveRepairService
         {
             session.forceShutdown();
         }
+        parentRepairSessions.clear();
     }
 
     // for testing only. Create a session corresponding to a fake request and
     // add it to the sessions (avoid NPE in tests)
     RepairFuture submitArtificialRepairSession(RepairJobDesc desc)
     {
-        RepairSession session = new RepairSession(desc.sessionId, desc.range, desc.keyspace, false, null, new String[]{desc.columnFamily});
+        Set<InetAddress> neighbours = new HashSet<>();
+        neighbours.addAll(ActiveRepairService.getNeighbors(desc.keyspace, desc.range, null));
+        RepairSession session = new RepairSession(desc.parentSessionId, desc.sessionId, desc.range, desc.keyspace, false, neighbours, new String[]{desc.columnFamily});
         sessions.put(session.getId(), session);
         RepairFuture futureTask = new RepairFuture(session);
         executor.execute(futureTask);
@@ -186,6 +213,122 @@ public class ActiveRepairService
         return neighbors;
     }
 
+    public UUID prepareForRepair(Set<InetAddress> endpoints, Collection<Range<Token>> ranges, List<ColumnFamilyStore> columnFamilyStores)
+    {
+        UUID parentRepairSession = UUIDGen.getTimeUUID();
+        registerParentRepairSession(parentRepairSession, columnFamilyStores, ranges);
+        prepareLatch = new CountDownLatch(endpoints.size());
+        IAsyncCallback callback = new IAsyncCallback()
+        {
+            @Override
+            public void response(MessageIn msg)
+            {
+                ActiveRepairService.this.prepareLatch.countDown();
+            }
+
+            @Override
+            public boolean isLatencyForSnitch()
+            {
+                return false;
+            }
+        };
+
+        List<UUID> cfIds = new ArrayList<>(columnFamilyStores.size());
+        for (ColumnFamilyStore cfs : columnFamilyStores)
+            cfIds.add(cfs.metadata.cfId);
+
+        for(InetAddress neighbour : endpoints)
+        {
+            PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, ranges);
+            MessageOut<RepairMessage> msg = message.createMessage();
+            MessagingService.instance().sendRR(msg, neighbour, callback);
+        }
+        try
+        {
+            prepareLatch.await(1, TimeUnit.HOURS);
+        }
+        catch (InterruptedException e)
+        {
+            parentRepairSessions.remove(parentRepairSession);
+            throw new RuntimeException("Did not get replies from all endpoints.", e);
+        }
+        return parentRepairSession;
+    }
+
+    public void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges)
+    {
+        Map<UUID, Set<SSTableReader>> sstablesToRepair = new HashMap<>();
+        for (ColumnFamilyStore cfs : columnFamilyStores)
+        {
+            Set<SSTableReader> sstables = new HashSet<>();
+            for (SSTableReader sstable : cfs.getSSTables())
+            {
+                if (new Bounds<>(sstable.first.token, sstable.last.token).intersects(ranges))
+                {
+                    if (!sstable.isRepaired())
+                    {
+                        sstables.add(sstable);
+                    }
+                }
+            }
+            sstablesToRepair.put(cfs.metadata.cfId, sstables);
+        }
+        parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, sstablesToRepair, System.currentTimeMillis()));
+    }
+
+    public void finishParentSession(UUID parentSession, Set<InetAddress> neighbors) throws InterruptedException, ExecutionException, IOException
+    {
+
+        for (InetAddress neighbor : neighbors)
+        {
+            AnticompactionRequest acr = new AnticompactionRequest(parentSession);
+            MessageOut<RepairMessage> req = acr.createMessage();
+            MessagingService.instance().sendOneWay(req, neighbor);
+        }
+        try
+        {
+            List<Future<?>> futures = doAntiCompaction(parentSession);
+            FBUtilities.waitOnFutures(futures);
+        }
+        finally
+        {
+            parentRepairSessions.remove(parentSession);
+        }
+    }
+
+    public ParentRepairSession getParentRepairSession(UUID parentSessionId)
+    {
+        return parentRepairSessions.get(parentSessionId);
+    }
+
+    public List<Future<?>> doAntiCompaction(UUID parentRepairSession) throws InterruptedException, ExecutionException, IOException
+    {
+        assert parentRepairSession != null;
+        ParentRepairSession prs = getParentRepairSession(parentRepairSession);
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet())
+        {
+
+            Collection<SSTableReader> sstables = new HashSet<>(prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey()));
+            ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue();
+            boolean success = false;
+            while (!success)
+            {
+                for (SSTableReader compactingSSTable : cfs.getDataTracker().getCompacting())
+                {
+                    if (sstables.remove(compactingSSTable))
+                        SSTableReader.releaseReferences(Arrays.asList(compactingSSTable));
+                }
+                success = sstables.isEmpty() || cfs.getDataTracker().markCompacting(sstables);
+            }
+
+            futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges, sstables, prs.repairedAt));
+        }
+
+        return futures;
+    }
+
     public void handleMessage(InetAddress endpoint, RepairMessage message)
     {
         RepairJobDesc desc = message.desc;
@@ -207,4 +350,41 @@ public class ActiveRepairService
                 break;
         }
     }
+
+    public static class ParentRepairSession
+    {
+        public final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>();
+        public final Collection<Range<Token>> ranges;
+        public final Map<UUID, Set<SSTableReader>> sstableMap;
+        public final long repairedAt;
+
+        public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, Map<UUID, Set<SSTableReader>> sstables, long repairedAt)
+        {
+            for (ColumnFamilyStore cfs : columnFamilyStores)
+                this.columnFamilyStores.put(cfs.metadata.cfId, cfs);
+            this.ranges = ranges;
+            this.sstableMap = sstables;
+            this.repairedAt = repairedAt;
+        }
+
+        public Collection<SSTableReader> getAndReferenceSSTables(UUID cfId)
+        {
+            Set<SSTableReader> sstables = sstableMap.get(cfId);
+            Iterator<SSTableReader> sstableIterator = sstables.iterator();
+            while (sstableIterator.hasNext())
+            {
+                SSTableReader sstable = sstableIterator.next();
+                if (!new File(sstable.descriptor.filenameFor(Component.DATA)).exists())
+                {
+                    sstableIterator.remove();
+                }
+                else
+                {
+                    if (!sstable.acquireReference())
+                        sstableIterator.remove();
+                }
+            }
+            return sstables;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 4d6e13f..99090b9 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1393,7 +1393,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      * Handle node bootstrap
      *
      * @param endpoint bootstrapping node
-     * @param pieces STATE_BOOTSTRAPPING,bootstrap token as string
      */
     private void handleStateBootstrap(InetAddress endpoint)
     {
@@ -2418,13 +2417,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         sendNotification(jmxNotification);
     }
 
-    public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final boolean primaryRange, final String... columnFamilies)
+    public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final boolean primaryRange, final boolean fullRepair, final String... columnFamilies) throws IOException
     {
         final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
-        return forceRepairAsync(keyspace, isSequential, dataCenters, ranges, columnFamilies);
+
+        return forceRepairAsync(keyspace, isSequential, dataCenters, ranges, fullRepair, columnFamilies);
     }
 
-    public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final Collection<Range<Token>> ranges, final String... columnFamilies)
+    public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final Collection<Range<Token>> ranges, final boolean fullRepair, final String... columnFamilies) throws IOException
     {
         if (Keyspace.SYSTEM_KS.equals(keyspace) || ranges.isEmpty())
             return 0;
@@ -2432,18 +2432,18 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         final int cmd = nextRepairCommand.incrementAndGet();
         if (ranges.size() > 0)
         {
-            new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, columnFamilies)).start();
+            new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, fullRepair, columnFamilies)).start();
         }
         return cmd;
     }
 
-    public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final boolean primaryRange, final String... columnFamilies)
+    public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final boolean primaryRange, final boolean fullRepair, final String... columnFamilies)
     {
         final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
-        return forceRepairAsync(keyspace, isSequential, isLocal, ranges, columnFamilies);
+        return forceRepairAsync(keyspace, isSequential, isLocal, ranges, fullRepair, columnFamilies);
     }
 
-    public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final Collection<Range<Token>> ranges, final String... columnFamilies)
+    public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final Collection<Range<Token>> ranges, final boolean fullRepair, final String... columnFamilies)
     {
         if (Keyspace.SYSTEM_KS.equals(keyspace) || ranges.isEmpty())
             return 0;
@@ -2451,29 +2451,29 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         final int cmd = nextRepairCommand.incrementAndGet();
         if (ranges.size() > 0)
         {
-            new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, isLocal, columnFamilies)).start();
+            new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, isLocal, fullRepair, columnFamilies)).start();
         }
         return cmd;
     }
 
-    public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String... columnFamilies)
+    public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, boolean fullRepair, final String... columnFamilies) throws IOException
     {
         Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
         Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
 
         logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
                     parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
-        return forceRepairAsync(keyspaceName, isSequential, dataCenters, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), columnFamilies);
+        return forceRepairAsync(keyspaceName, isSequential, dataCenters, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), fullRepair, columnFamilies);
     }
 
-    public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies)
+    public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, final String... columnFamilies)
     {
         Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
         Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
 
         logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
                     parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
-        return forceRepairAsync(keyspaceName, isSequential, isLocal, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), columnFamilies);
+        return forceRepairAsync(keyspaceName, isSequential, isLocal, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), fullRepair, columnFamilies);
     }
 
 
@@ -2483,53 +2483,72 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      * @param columnFamilies
      * @throws IOException
      */
-    public void forceKeyspaceRepair(final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
+    public void forceKeyspaceRepair(final String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, final String... columnFamilies) throws IOException
     {
-        forceKeyspaceRepairRange(keyspaceName, getLocalRanges(keyspaceName), isSequential, isLocal, columnFamilies);
+        forceKeyspaceRepairRange(keyspaceName, getLocalRanges(keyspaceName), isSequential, isLocal, fullRepair, columnFamilies);
     }
 
-    public void forceKeyspaceRepairPrimaryRange(final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
+    public void forceKeyspaceRepairPrimaryRange(final String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, final String... columnFamilies) throws IOException
     {
-        forceKeyspaceRepairRange(keyspaceName, getLocalPrimaryRanges(keyspaceName), isSequential, isLocal, columnFamilies);
+        forceKeyspaceRepairRange(keyspaceName, getLocalPrimaryRanges(keyspaceName), isSequential, isLocal, fullRepair, columnFamilies);
     }
 
-    public void forceKeyspaceRepairRange(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
+    public void forceKeyspaceRepairRange(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, final String... columnFamilies) throws IOException
     {
         Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
         Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
 
         logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}",
                     parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
-        forceKeyspaceRepairRange(keyspaceName, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), isSequential, isLocal, columnFamilies);
+        forceKeyspaceRepairRange(keyspaceName, Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)), isSequential, isLocal, fullRepair, columnFamilies);
     }
 
-    public void forceKeyspaceRepairRange(final String keyspaceName, final Collection<Range<Token>> ranges, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
+    public void forceKeyspaceRepairRange(final String keyspaceName, final Collection<Range<Token>> ranges, boolean isSequential, boolean isLocal, boolean fullRepair, final String... columnFamilies) throws IOException
     {
         if (Keyspace.SYSTEM_KS.equalsIgnoreCase(keyspaceName))
             return;
-        createRepairTask(nextRepairCommand.incrementAndGet(), keyspaceName, ranges, isSequential, isLocal, columnFamilies).run();
+        createRepairTask(nextRepairCommand.incrementAndGet(), keyspaceName, ranges, isSequential, isLocal, fullRepair, columnFamilies).run();
     }
 
-    private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final boolean isLocal, final String... columnFamilies)
+    private FutureTask<Object> createRepairTask(final int cmd,
+                                                final String keyspace,
+                                                final Collection<Range<Token>> ranges,
+                                                final boolean isSequential,
+                                                final boolean isLocal,
+                                                final boolean fullRepair,
+                                                final String... columnFamilies)
     {
         Set<String> dataCenters = null;
         if (isLocal)
         {
             dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
         }
-        return createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, columnFamilies);
+        return createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, fullRepair, columnFamilies);
     }
 
-    private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final Collection<String> dataCenters, final String... columnFamilies)
+    private FutureTask<Object> createRepairTask(final int cmd,
+                                                final String keyspace,
+                                                final Collection<Range<Token>> ranges,
+                                                final boolean isSequential,
+                                                final Collection<String> dataCenters,
+                                                final boolean fullRepair,
+                                                final String... columnFamilies)
     {
-        return new FutureTask<Object>(new WrappedRunnable()
+        return new FutureTask<>(new WrappedRunnable()
         {
             protected void runMayThrow() throws Exception
             {
-                String message = String.format("Starting repair command #%d, repairing %d ranges for keyspace %s", cmd, ranges.size(), keyspace);
+                String message = String.format("Starting repair command #%d, repairing %d ranges for keyspace %s (seq=%b, full=%b)", cmd, ranges.size(), keyspace, isSequential, fullRepair);
                 logger.info(message);
                 sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.STARTED.ordinal()});
 
+                if (isSequential && !fullRepair)
+                {
+                    message = "It is not possible to mix sequential repair and incremental repairs.";
+                    logger.error(message);
+                    sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
+                    return;
+                }
                 if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
                 {
                     message = String.format("Cancelling repair command #%d (the local data center must be part of the repair)", cmd);
@@ -2538,13 +2557,25 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                     return;
                 }
 
-                List<RepairFuture> futures = new ArrayList<RepairFuture>(ranges.size());
+                Set<InetAddress> neighbours = new HashSet<>();
+                for (Range<Token> range : ranges)
+                    neighbours.addAll(ActiveRepairService.getNeighbors(keyspace, range, dataCenters));
+
+                List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>();
+                for (ColumnFamilyStore cfs : getValidColumnFamilies(false, false, keyspace, columnFamilies))
+                    columnFamilyStores.add(cfs);
+
+                UUID parentSession = null;
+                if (!fullRepair)
+                    parentSession = ActiveRepairService.instance.prepareForRepair(neighbours, ranges, columnFamilyStores);
+
+                List<RepairFuture> futures = new ArrayList<>(ranges.size());
                 for (Range<Token> range : ranges)
                 {
                     RepairFuture future;
                     try
                     {
-                        future = forceKeyspaceRepair(range, keyspace, isSequential, dataCenters, columnFamilies);
+                        future = forceKeyspaceRepair(parentSession, range, keyspace, isSequential, neighbours, columnFamilies);
                     }
                     catch (IllegalArgumentException e)
                     {
@@ -2567,6 +2598,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                         sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.SESSION_FAILED.ordinal()});
                     }
                 }
+
                 for (RepairFuture future : futures)
                 {
                     try
@@ -2589,14 +2621,22 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                         sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.SESSION_FAILED.ordinal()});
                     }
                 }
+                if (!fullRepair)
+                    ActiveRepairService.instance.finishParentSession(parentSession, neighbours);
                 sendNotification("repair", String.format("Repair command #%d finished", cmd), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
             }
         }, null);
     }
 
-    public RepairFuture forceKeyspaceRepair(final Range<Token> range, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String... columnFamilies) throws IOException
+
+    public RepairFuture forceKeyspaceRepair(final UUID parentRepairSession,
+                                            final Range<Token> range,
+                                            final String keyspaceName,
+                                            boolean isSequential,
+                                            Set<InetAddress> endpoints,
+                                            String ... columnFamilies) throws IOException
     {
-        ArrayList<String> names = new ArrayList<String>();
+        ArrayList<String> names = new ArrayList<>();
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies))
         {
             names.add(cfStore.name);
@@ -2608,7 +2648,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             return null;
         }
 
-        return ActiveRepairService.instance.submitRepairSession(range, keyspaceName, isSequential, dataCenters, names.toArray(new String[names.size()]));
+        return ActiveRepairService.instance.submitRepairSession(parentRepairSession, range, keyspaceName, isSequential, endpoints, names.toArray(new String[names.size()]));
     }
 
     public void forceTerminateAllRepairSessions() {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 5a1bb22..66afaa1 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -270,12 +270,12 @@ public interface StorageServiceMBean extends NotificationEmitter
      *
      * @return Repair command number, or 0 if nothing to repair
      */
-    public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, boolean primaryRange, String... columnFamilies);
+    public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, boolean primaryRange, boolean repairedAt, String... columnFamilies) throws IOException;
 
     /**
      * Same as forceRepairAsync, but handles a specified range
      */
-    public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String... columnFamilies);
+    public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection<String> dataCenters, boolean repairedAt, String... columnFamilies) throws IOException;
 
 
     /**
@@ -286,14 +286,14 @@ public interface StorageServiceMBean extends NotificationEmitter
      *   userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status
      *
      * @return Repair command number, or 0 if nothing to repair
-     * @see #forceKeyspaceRepair(String, boolean, boolean, String...)
+     * @see #forceKeyspaceRepair(String, boolean, boolean, boolean, String...)
      */
-    public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, String... columnFamilies);
+    public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies);
 
     /**
      * Same as forceRepairAsync, but handles a specified range
      */
-    public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies);
+    public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean repairedAt, String... columnFamilies);
 
     /**
      * Triggers proactive repair for given column families, or all columnfamilies for the given keyspace
@@ -302,12 +302,12 @@ public interface StorageServiceMBean extends NotificationEmitter
      * @param columnFamilies
      * @throws IOException
      */
-    public void forceKeyspaceRepair(String keyspaceName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException;
+    public void forceKeyspaceRepair(String keyspaceName, boolean isSequential, boolean isLocal, boolean repairedAt, String... columnFamilies) throws IOException;
 
     /**
      * Triggers proactive repair but only for the node primary range.
      */
-    public void forceKeyspaceRepairPrimaryRange(String keyspaceName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException;
+    public void forceKeyspaceRepairPrimaryRange(String keyspaceName, boolean isSequential, boolean isLocal, boolean repairedAt, String... columnFamilies) throws IOException;
 
     /**
      * Perform repair of a specific range.
@@ -315,7 +315,7 @@ public interface StorageServiceMBean extends NotificationEmitter
      * This allows incremental repair to be performed by having an external controller submitting repair jobs.
      * Note that the provided range much be a subset of one of the node local range.
      */
-    public void forceKeyspaceRepairRange(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException;
+    public void forceKeyspaceRepairRange(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean repairedAt, String... columnFamilies) throws IOException;
 
     public void forceTerminateAllRepairSessions();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 288929c..ff78e84 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -22,6 +22,7 @@ import java.util.*;
 
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.UUIDGen;
 
 /**
@@ -36,6 +37,7 @@ public class StreamPlan
 
     // sessions per InetAddress of the other end.
     private final Map<InetAddress, StreamSession> sessions = new HashMap<>();
+    private final long repairedAt;
 
     private boolean flushBeforeTransfer = true;
 
@@ -46,9 +48,16 @@ public class StreamPlan
      */
     public StreamPlan(String description)
     {
+        this(description, ActiveRepairService.UNREPAIRED_SSTABLE);
+    }
+
+    public StreamPlan(String description, long repairedAt)
+    {
         this.description = description;
+        this.repairedAt = repairedAt;
     }
 
+
     /**
      * Request data in {@code keyspace} and {@code ranges} from specific node.
      *
@@ -74,7 +83,7 @@ public class StreamPlan
     public StreamPlan requestRanges(InetAddress from, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
     {
         StreamSession session = getOrCreateSession(from);
-        session.addStreamRequest(keyspace, ranges, Arrays.asList(columnFamilies));
+        session.addStreamRequest(keyspace, ranges, Arrays.asList(columnFamilies), repairedAt);
         return this;
     }
 
@@ -103,7 +112,7 @@ public class StreamPlan
     public StreamPlan transferRanges(InetAddress to, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
     {
         StreamSession session = getOrCreateSession(to);
-        session.addTransferRanges(keyspace, ranges, Arrays.asList(columnFamilies), flushBeforeTransfer);
+        session.addTransferRanges(keyspace, ranges, Arrays.asList(columnFamilies), flushBeforeTransfer, repairedAt);
         return this;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 72c239c..d805bf3 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -27,6 +27,9 @@ import java.util.Collection;
 import java.util.UUID;
 
 import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.ning.compress.lzf.LZFInputStream;
 
 import org.apache.cassandra.config.Schema;
@@ -37,6 +40,7 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.messages.FileMessageHeader;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -48,11 +52,13 @@ import org.apache.cassandra.utils.Pair;
  */
 public class StreamReader
 {
+    private static final Logger logger = LoggerFactory.getLogger(StreamReader.class);
     protected final UUID cfId;
     protected final long estimatedKeys;
     protected final Collection<Pair<Long, Long>> sections;
     protected final StreamSession session;
     protected final Descriptor.Version inputVersion;
+    protected final long repairedAt;
 
     protected Descriptor desc;
 
@@ -63,6 +69,7 @@ public class StreamReader
         this.estimatedKeys = header.estimatedKeys;
         this.sections = header.sections;
         this.inputVersion = new Descriptor.Version(header.version);
+        this.repairedAt = header.repairedAt;
     }
 
     /**
@@ -72,12 +79,13 @@ public class StreamReader
      */
     public SSTableWriter read(ReadableByteChannel channel) throws IOException
     {
+        logger.info("reading file from {}, repairedAt = {}", session.peer, repairedAt);
         long totalSize = totalSize();
 
         Pair<String, String> kscf = Schema.instance.getCF(cfId);
         ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 
-        SSTableWriter writer = createWriter(cfs, totalSize);
+        SSTableWriter writer = createWriter(cfs, totalSize, repairedAt);
         DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
         BytesReadTracker in = new BytesReadTracker(dis);
         try
@@ -101,14 +109,14 @@ public class StreamReader
         }
     }
 
-    protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize) throws IOException
+    protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt) throws IOException
     {
         Directories.DataDirectory localDir = cfs.directories.getWriteableLocation();
         if (localDir == null)
             throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
         desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir)));
 
-        return new SSTableWriter(desc.filenameFor(Component.DATA), estimatedKeys);
+        return new SSTableWriter(desc.filenameFor(Component.DATA), estimatedKeys, repairedAt);
     }
 
     protected void drain(InputStream dis, long bytesRead) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 9a2568d..b4d5392 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.Pair;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/streaming/StreamRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamRequest.java b/src/java/org/apache/cassandra/streaming/StreamRequest.java
index 9d3fdb2..e8a3fcb 100644
--- a/src/java/org/apache/cassandra/streaming/StreamRequest.java
+++ b/src/java/org/apache/cassandra/streaming/StreamRequest.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.service.ActiveRepairService;
 
 public class StreamRequest
 {
@@ -37,12 +38,13 @@ public class StreamRequest
     public final String keyspace;
     public final Collection<Range<Token>> ranges;
     public final Collection<String> columnFamilies = new HashSet<>();
-
-    public StreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies)
+    public final long repairedAt;
+    public StreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, long repairedAt)
     {
         this.keyspace = keyspace;
         this.ranges = ranges;
         this.columnFamilies.addAll(columnFamilies);
+        this.repairedAt = repairedAt;
     }
 
     public static class StreamRequestSerializer implements IVersionedSerializer<StreamRequest>
@@ -50,6 +52,7 @@ public class StreamRequest
         public void serialize(StreamRequest request, DataOutput out, int version) throws IOException
         {
             out.writeUTF(request.keyspace);
+            out.writeLong(request.repairedAt);
             out.writeInt(request.ranges.size());
             for (Range<Token> range : request.ranges)
             {
@@ -64,6 +67,7 @@ public class StreamRequest
         public StreamRequest deserialize(DataInput in, int version) throws IOException
         {
             String keyspace = in.readUTF();
+            long repairedAt = in.readLong();
             int rangeCount = in.readInt();
             List<Range<Token>> ranges = new ArrayList<>(rangeCount);
             for (int i = 0; i < rangeCount; i++)
@@ -76,12 +80,13 @@ public class StreamRequest
             List<String> columnFamilies = new ArrayList<>(cfCount);
             for (int i = 0; i < cfCount; i++)
                 columnFamilies.add(in.readUTF());
-            return new StreamRequest(keyspace, ranges, columnFamilies);
+            return new StreamRequest(keyspace, ranges, columnFamilies, repairedAt);
         }
 
         public long serializedSize(StreamRequest request, int version)
         {
             int size = TypeSizes.NATIVE.sizeof(request.keyspace);
+            size += TypeSizes.NATIVE.sizeof(request.repairedAt);
             size += TypeSizes.NATIVE.sizeof(request.ranges.size());
             for (Range<Token> range : request.ranges)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index e65f2db..f766bb6 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.metrics.StreamingMetrics;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.messages.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -215,19 +216,34 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
      * @param ranges Ranges to retrieve data
      * @param columnFamilies ColumnFamily names. Can be empty if requesting all CF under the keyspace.
      */
-    public void addStreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies)
+    public void addStreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, long repairedAt)
     {
-        requests.add(new StreamRequest(keyspace, ranges, columnFamilies));
+        requests.add(new StreamRequest(keyspace, ranges, columnFamilies, repairedAt));
     }
 
     /**
      * Set up transfer for specific keyspace/ranges/CFs
      *
+     * Used in repair - a streamed sstable in repair will be marked with the given repairedAt time
+     *
      * @param keyspace Transfer keyspace
      * @param ranges Transfer ranges
      * @param columnFamilies Transfer ColumnFamilies
+     * @param flushTables flush tables?
+     * @param repairedAt the time the repair started.
      */
-    public void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables)
+    public void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables, long repairedAt)
+    {
+        Collection<ColumnFamilyStore> stores = getColumnFamilyStores(keyspace, columnFamilies);
+        if (flushTables)
+            flushSSTables(stores);
+
+        List<Range<Token>> normalizedRanges = Range.normalize(ranges);
+        List<SSTableReader> sstables = getSSTablesForRanges(normalizedRanges, stores);
+        addTransferFiles(normalizedRanges, sstables, repairedAt);
+    }
+
+    private Collection<ColumnFamilyStore> getColumnFamilyStores(String keyspace, Collection<String> columnFamilies)
     {
         Collection<ColumnFamilyStore> stores = new HashSet<>();
         // if columnfamilies are not specified, we add all cf under the keyspace
@@ -240,11 +256,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
             for (String cf : columnFamilies)
                 stores.add(Keyspace.open(keyspace).getColumnFamilyStore(cf));
         }
+        return stores;
+    }
 
-        if (flushTables)
-            flushSSTables(stores);
-
-        List<Range<Token>> normalizedRanges = Range.normalize(ranges);
+    private List<SSTableReader> getSSTablesForRanges(Collection<Range<Token>> normalizedRanges, Collection<ColumnFamilyStore> stores)
+    {
         List<SSTableReader> sstables = Lists.newLinkedList();
         for (ColumnFamilyStore cfStore : stores)
         {
@@ -254,7 +270,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
             ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList);
             sstables.addAll(view.sstables);
         }
-        addTransferFiles(normalizedRanges, sstables);
+        return sstables;
     }
 
     /**
@@ -263,12 +279,21 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
      *
      * @param ranges Transfer ranges
      * @param sstables Transfer files
+     * @param overriddenRepairedAt use this repairedAt time, for use in repair.
      */
-    public void addTransferFiles(Collection<Range<Token>> ranges, Collection<SSTableReader> sstables)
+    public void addTransferFiles(Collection<Range<Token>> ranges, Collection<SSTableReader> sstables, long overriddenRepairedAt)
     {
         List<SSTableStreamingSections> sstableDetails = new ArrayList<>(sstables.size());
         for (SSTableReader sstable : sstables)
-            sstableDetails.add(new SSTableStreamingSections(sstable, sstable.getPositionsForRanges(ranges), sstable.estimatedKeysForRanges(ranges)));
+        {
+            long repairedAt = overriddenRepairedAt;
+            if (overriddenRepairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+                repairedAt = sstable.getSSTableMetadata().repairedAt;
+            sstableDetails.add(new SSTableStreamingSections(sstable,
+                                                            sstable.getPositionsForRanges(ranges),
+                                                            sstable.estimatedKeysForRanges(ranges),
+                                                            repairedAt));
+        }
 
         addTransferFiles(sstableDetails);
     }
@@ -291,7 +316,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
                 task = new StreamTransferTask(this, cfId);
                 transfers.put(cfId, task);
             }
-            task.addTransferFile(details.sstable, details.estimatedKeys, details.sections);
+            task.addTransferFile(details.sstable, details.estimatedKeys, details.sections, details.repairedAt);
         }
     }
 
@@ -300,12 +325,14 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
         public final SSTableReader sstable;
         public final List<Pair<Long, Long>> sections;
         public final long estimatedKeys;
+        public final long repairedAt;
 
-        public SSTableStreamingSections(SSTableReader sstable, List<Pair<Long, Long>> sections, long estimatedKeys)
+        public SSTableStreamingSections(SSTableReader sstable, List<Pair<Long, Long>> sections, long estimatedKeys, long repairedAt)
         {
             this.sstable = sstable;
             this.sections = sections;
             this.estimatedKeys = estimatedKeys;
+            this.repairedAt = repairedAt;
         }
     }
 
@@ -407,7 +434,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
             startStreamingFiles();
     }
 
-    /**
+    /**l
      * Call back for handling exception during streaming.
      *
      * @param e thrown exception
@@ -430,7 +457,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
         // prepare tasks
         state(State.PREPARING);
         for (StreamRequest request : requests)
-            addTransferRanges(request.keyspace, request.ranges, request.columnFamilies, true); // always flush on stream request
+            addTransferRanges(request.keyspace, request.ranges, request.columnFamilies, true, request.repairedAt); // always flush on stream request
         for (StreamSummary summary : summaries)
             prepareReceiving(summary);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 8e461cc..13171f4 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -40,10 +40,10 @@ public class StreamTransferTask extends StreamTask
         super(session, cfId);
     }
 
-    public void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections)
+    public void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
     {
         assert sstable != null && cfId.equals(sstable.metadata.cfId);
-        OutgoingFileMessage message = new OutgoingFileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections);
+        OutgoingFileMessage message = new OutgoingFileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt);
         files.put(message.header.sequenceNumber, message);
         totalSize += message.header.size();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index d294e4a..3c13d11 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -24,11 +24,15 @@ import java.nio.channels.ReadableByteChannel;
 
 import com.google.common.base.Throwables;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamReader;
 import org.apache.cassandra.streaming.StreamSession;
@@ -41,6 +45,8 @@ import org.apache.cassandra.utils.Pair;
  */
 public class CompressedStreamReader extends StreamReader
 {
+    private static final Logger logger = LoggerFactory.getLogger(StreamReader.class);
+
     protected final CompressionInfo compressionInfo;
 
     public CompressedStreamReader(FileMessageHeader header, StreamSession session)
@@ -56,12 +62,13 @@ public class CompressedStreamReader extends StreamReader
     @Override
     public SSTableWriter read(ReadableByteChannel channel) throws IOException
     {
+        logger.info("reading file from {}, repairedAt = {}", session.peer, repairedAt);
         long totalSize = totalSize();
 
         Pair<String, String> kscf = Schema.instance.getCF(cfId);
         ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 
-        SSTableWriter writer = createWriter(cfs, totalSize);
+        SSTableWriter writer = createWriter(cfs, totalSize, repairedAt);
 
         CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums);
         BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index 24f1e04..3e86027 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -46,13 +46,15 @@ public class FileMessageHeader
     public final long estimatedKeys;
     public final List<Pair<Long, Long>> sections;
     public final CompressionInfo compressionInfo;
+    public final long repairedAt;
 
     public FileMessageHeader(UUID cfId,
                              int sequenceNumber,
                              String version,
                              long estimatedKeys,
                              List<Pair<Long, Long>> sections,
-                             CompressionInfo compressionInfo)
+                             CompressionInfo compressionInfo,
+                             long repairedAt)
     {
         this.cfId = cfId;
         this.sequenceNumber = sequenceNumber;
@@ -60,6 +62,7 @@ public class FileMessageHeader
         this.estimatedKeys = estimatedKeys;
         this.sections = sections;
         this.compressionInfo = compressionInfo;
+        this.repairedAt = repairedAt;
     }
 
     /**
@@ -92,6 +95,7 @@ public class FileMessageHeader
         sb.append(", estimated keys: ").append(estimatedKeys);
         sb.append(", transfer size: ").append(size());
         sb.append(", compressed?: ").append(compressionInfo != null);
+        sb.append(", repairedAt: ").append(repairedAt);
         sb.append(')');
         return sb.toString();
     }
@@ -129,6 +133,7 @@ public class FileMessageHeader
                 out.writeLong(section.right);
             }
             CompressionInfo.serializer.serialize(header.compressionInfo, out, version);
+            out.writeLong(header.repairedAt);
         }
 
         public FileMessageHeader deserialize(DataInput in, int version) throws IOException
@@ -142,7 +147,8 @@ public class FileMessageHeader
             for (int k = 0; k < count; k++)
                 sections.add(Pair.create(in.readLong(), in.readLong()));
             CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, MessagingService.current_version);
-            return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, estimatedKeys, sections, compressionInfo);
+            long repairedAt = in.readLong();
+            return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, estimatedKeys, sections, compressionInfo, repairedAt);
         }
 
         public long serializedSize(FileMessageHeader header, int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index 1fa115f..82f6c01 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -64,7 +64,7 @@ public class OutgoingFileMessage extends StreamMessage
     public FileMessageHeader header;
     public SSTableReader sstable;
 
-    public OutgoingFileMessage(SSTableReader sstable, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections)
+    public OutgoingFileMessage(SSTableReader sstable, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt)
     {
         super(Type.FILE);
         this.sstable = sstable;
@@ -76,11 +76,12 @@ public class OutgoingFileMessage extends StreamMessage
             compressionInfo = new CompressionInfo(meta.getChunksForSections(sections), meta.parameters);
         }
         this.header = new FileMessageHeader(sstable.metadata.cfId,
-                sequenceNumber,
-                sstable.descriptor.version.toString(),
-                estimatedKeys,
-                sections,
-                compressionInfo);
+                                            sequenceNumber,
+                                            sstable.descriptor.version.toString(),
+                                            estimatedKeys,
+                                            sections,
+                                            compressionInfo,
+                                            repairedAt);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index a342866..2fcdd63 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -211,19 +211,19 @@ public class NodeProbe implements AutoCloseable
         ssProxy.forceKeyspaceFlush(keyspaceName, columnFamilies);
     }
 
-    public void forceKeyspaceRepair(String keyspaceName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException
+    public void forceKeyspaceRepair(String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, String... columnFamilies) throws IOException
     {
-        ssProxy.forceKeyspaceRepair(keyspaceName, isSequential, isLocal, columnFamilies);
+        ssProxy.forceKeyspaceRepair(keyspaceName, isSequential, isLocal, fullRepair, columnFamilies);
     }
 
-    public void forceRepairAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, boolean primaryRange, String... columnFamilies) throws IOException
+    public void forceRepairAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, boolean primaryRange, boolean fullRepair, String... columnFamilies) throws IOException
     {
         RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies);
         try
         {
             jmxc.addConnectionNotificationListener(runner, null, null);
             ssProxy.addNotificationListener(runner, null, null);
-            if (!runner.repairAndWait(ssProxy, isSequential, dataCenters, primaryRange))
+            if (!runner.repairAndWait(ssProxy, isSequential, dataCenters, primaryRange, fullRepair))
                 failed = true;
         }
         catch (Exception e)
@@ -244,14 +244,14 @@ public class NodeProbe implements AutoCloseable
         }
     }
 
-    public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String startToken, final String endToken, String... columnFamilies) throws IOException
+    public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final String startToken, final String endToken, boolean fullRepair, String... columnFamilies) throws IOException
     {
         RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies);
         try
         {
             jmxc.addConnectionNotificationListener(runner, null, null);
             ssProxy.addNotificationListener(runner, null, null);
-            if (!runner.repairRangeAndWait(ssProxy,  isSequential, dataCenters, startToken, endToken))
+            if (!runner.repairRangeAndWait(ssProxy,  isSequential, dataCenters, startToken, endToken, fullRepair))
                 failed = true;
         }
         catch (Exception e)
@@ -272,14 +272,14 @@ public class NodeProbe implements AutoCloseable
         }
     }
 
-    public void forceKeyspaceRepairPrimaryRange(String keyspaceName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException
+    public void forceKeyspaceRepairPrimaryRange(String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, String... columnFamilies) throws IOException
     {
-        ssProxy.forceKeyspaceRepairPrimaryRange(keyspaceName, isSequential, isLocal, columnFamilies);
+        ssProxy.forceKeyspaceRepairPrimaryRange(keyspaceName, isSequential, isLocal, fullRepair, columnFamilies);
     }
 
-    public void forceKeyspaceRepairRange(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException
+    public void forceKeyspaceRepairRange(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, String... columnFamilies) throws IOException
     {
-        ssProxy.forceKeyspaceRepairRange(beginToken, endToken, keyspaceName, isSequential, isLocal, columnFamilies);
+        ssProxy.forceKeyspaceRepairRange(beginToken, endToken, keyspaceName, isSequential, isLocal, fullRepair, columnFamilies);
     }
 
     public void invalidateCounterCache()
@@ -1237,16 +1237,16 @@ class RepairRunner implements NotificationListener
         this.columnFamilies = columnFamilies;
     }
 
-    public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, boolean primaryRangeOnly) throws Exception
+    public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, boolean primaryRangeOnly, boolean fullRepair) throws Exception
     {
-        cmd = ssProxy.forceRepairAsync(keyspace, isSequential, dataCenters, primaryRangeOnly, columnFamilies);
+        cmd = ssProxy.forceRepairAsync(keyspace, isSequential, dataCenters, primaryRangeOnly, fullRepair, columnFamilies);
         waitForRepair();
         return success;
     }
 
-    public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, String startToken, String endToken) throws Exception
+    public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, String startToken, String endToken, boolean fullRepair) throws Exception
     {
-        cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential, dataCenters, columnFamilies);
+        cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential, dataCenters, fullRepair, columnFamilies);
         waitForRepair();
         return success;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 10e581c..94bce74 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -1556,6 +1556,9 @@ public class NodeTool
         @Option(title = "primary_range", name = {"-pr", "--partitioner-range"}, description = "Use -pr to repair only the first range returned by the partitioner")
         private boolean primaryRange = false;
 
+        @Option(title = "incremental_repair", name = {"-inc", "--incremental"}, description = "Use -inc to use the new incremental repair")
+        private boolean incrementalRepair = false;
+
         @Override
         public void execute(NodeProbe probe)
         {
@@ -1571,11 +1574,10 @@ public class NodeTool
                         dataCenters = newArrayList(specificDataCenters);
                     else if (localDC)
                         dataCenters = newArrayList(probe.getDataCenter());
-
                     if (!startToken.isEmpty() || !endToken.isEmpty())
-                        probe.forceRepairRangeAsync(System.out, keyspace, !parallel, dataCenters, startToken, endToken);
+                        probe.forceRepairRangeAsync(System.out, keyspace, !parallel, dataCenters, startToken, endToken, !incrementalRepair);
                     else
-                        probe.forceRepairAsync(System.out, keyspace, !parallel, dataCenters, primaryRange, cfnames);
+                        probe.forceRepairAsync(System.out, keyspace, !parallel, dataCenters, primaryRange, !incrementalRepair, cfnames);
                 } catch (Exception e)
                 {
                     throw new RuntimeException("Error occurred during repair", e);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/tools/SSTableImport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableImport.java b/src/java/org/apache/cassandra/tools/SSTableImport.java
index 71b687b..2cb284e 100644
--- a/src/java/org/apache/cassandra/tools/SSTableImport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableImport.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonParser;
@@ -366,7 +367,7 @@ public class SSTableImport
         Object[] data = parser.readValueAs(new TypeReference<Object[]>(){});
 
         keyCountToImport = (keyCountToImport == null) ? data.length : keyCountToImport;
-        SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport);
+        SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE);
 
         System.out.printf("Importing %s keys...%n", keyCountToImport);
 
@@ -442,7 +443,7 @@ public class SSTableImport
         System.out.printf("Importing %s keys...%n", keyCountToImport);
 
         parser = getParser(jsonFile); // renewing parser
-        SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport);
+        SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE);
 
         int lineNumber = 1;
         DecoratedKey prevStoredKey = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
index 0ab94c4..374ef79 100644
--- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
+++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
@@ -66,6 +66,7 @@ public class SSTableMetadataViewer
                     out.printf("Compression ratio: %s%n", stats.compressionRatio);
                     out.printf("Estimated droppable tombstones: %s%n", stats.getEstimatedDroppableTombstoneRatio((int) (System.currentTimeMillis() / 1000)));
                     out.printf("SSTable Level: %d%n", stats.sstableLevel);
+                    out.printf("Repaired at: %d%n", stats.repairedAt);
                     out.println(stats.replayPosition);
                     out.println("Estimated tombstone drop times:%n");
                     for (Map.Entry<Double, Long> entry : stats.estimatedTombstoneDropTime.getAsMap().entrySet())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
index 54fc22f..90e7123 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
@@ -72,7 +72,7 @@ public class LongLeveledCompactionStrategyTest extends SchemaLoader
         {
             while (true)
             {
-                final AbstractCompactionTask t = lcs.getMaximalTask(Integer.MIN_VALUE);
+                final AbstractCompactionTask t = lcs.getMaximalTask(Integer.MIN_VALUE).iterator().next();
                 if (t == null)
                     break;
                 tasks.add(new Runnable()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
index 9bba196..311d21b 100644
--- a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
+++ b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
@@ -36,18 +36,19 @@ import java.util.Map;
 
 public class AbstractSerializationsTester extends SchemaLoader
 {
-    protected static final String CUR_VER = System.getProperty("cassandra.version", "2.0");
+    protected static final String CUR_VER = System.getProperty("cassandra.version", "2.1");
     protected static final Map<String, Integer> VERSION_MAP = new HashMap<String, Integer> ()
     {{
         put("0.7", 1);
         put("1.0", 3);
         put("1.2", MessagingService.VERSION_12);
         put("2.0", MessagingService.VERSION_20);
+        put("2.1", MessagingService.VERSION_21);
     }};
 
     protected static final boolean EXECUTE_WRITES = Boolean.getBoolean("cassandra.test-serialization-writes");
 
-    protected final int getVersion()
+    protected static int getVersion()
     {
         return VERSION_MAP.get(CUR_VER);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 92ca14e..7bc0256 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -1596,6 +1597,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
                 collector.addAncestor(sstable1.descriptor.generation); // add ancestor from previously written sstable
                 return new SSTableWriter(makeFilename(directory, metadata.ksName, metadata.cfName),
                                          0,
+                                         ActiveRepairService.UNREPAIRED_SSTABLE,
                                          metadata,
                                          StorageService.getPartitioner(),
                                          collector);
@@ -1652,6 +1654,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
                 String file = new Descriptor(directory, ks, cf, 3, true).filenameFor(Component.DATA);
                 return new SSTableWriter(file,
                                          0,
+                                         ActiveRepairService.UNREPAIRED_SSTABLE,
                                          metadata,
                                          StorageService.getPartitioner(),
                                          collector);


[3/3] git commit: Avoid repairing already repaired data.

Posted by ma...@apache.org.
Avoid repairing already repaired data.

Patch by lyubent, yukim and marcuse, reviewed by yukim for CASSANDRA-5153


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7b72140
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7b72140
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7b72140

Branch: refs/heads/trunk
Commit: a7b72140b61cf1998963750c21d6f6080f02d6bb
Parents: bcfaeaa
Author: Marcus Eriksson <ma...@apache.org>
Authored: Sat Feb 8 08:46:24 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Sat Feb 8 08:46:24 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   5 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  27 +++
 .../org/apache/cassandra/db/DataTracker.java    |   8 +
 src/java/org/apache/cassandra/db/Memtable.java  |   3 +
 .../compaction/AbstractCompactionStrategy.java  |   2 +-
 .../db/compaction/CompactionController.java     |   2 +-
 .../db/compaction/CompactionManager.java        | 176 ++++++++++++++++-
 .../cassandra/db/compaction/CompactionTask.java |  23 ++-
 .../compaction/LeveledCompactionStrategy.java   |  22 ++-
 .../db/compaction/LeveledManifest.java          | 195 +++++++++++++++----
 .../cassandra/db/compaction/OperationType.java  |   3 +-
 .../cassandra/db/compaction/Scrubber.java       |   5 +-
 .../SizeTieredCompactionStrategy.java           |  46 ++++-
 .../cassandra/db/compaction/Upgrader.java       |   6 +-
 .../io/sstable/AbstractSSTableSimpleWriter.java |   2 +
 .../apache/cassandra/io/sstable/Descriptor.java |   2 +
 .../cassandra/io/sstable/SSTableLoader.java     |   3 +-
 .../cassandra/io/sstable/SSTableReader.java     |   6 +
 .../cassandra/io/sstable/SSTableWriter.java     |   9 +-
 .../sstable/metadata/IMetadataSerializer.java   |   5 +
 .../metadata/LegacyMetadataSerializer.java      |   4 +-
 .../io/sstable/metadata/MetadataCollector.java  |   9 +-
 .../io/sstable/metadata/MetadataSerializer.java |  16 ++
 .../io/sstable/metadata/StatsMetadata.java      |  35 +++-
 .../org/apache/cassandra/repair/RepairJob.java  |   4 +-
 .../apache/cassandra/repair/RepairJobDesc.java  |  35 +++-
 .../repair/RepairMessageVerbHandler.java        |  47 +++++
 .../apache/cassandra/repair/RepairSession.java  |  17 +-
 .../cassandra/repair/StreamingRepairTask.java   |   7 +-
 .../repair/messages/RepairMessage.java          |   4 +-
 .../cassandra/service/ActiveRepairService.java  | 188 +++++++++++++++++-
 .../cassandra/service/StorageService.java       | 102 +++++++---
 .../cassandra/service/StorageServiceMBean.java  |  16 +-
 .../apache/cassandra/streaming/StreamPlan.java  |  13 +-
 .../cassandra/streaming/StreamReader.java       |  14 +-
 .../cassandra/streaming/StreamReceiveTask.java  |   1 +
 .../cassandra/streaming/StreamRequest.java      |  11 +-
 .../cassandra/streaming/StreamSession.java      |  55 ++++--
 .../cassandra/streaming/StreamTransferTask.java |   4 +-
 .../compress/CompressedStreamReader.java        |   9 +-
 .../streaming/messages/FileMessageHeader.java   |  10 +-
 .../streaming/messages/OutgoingFileMessage.java |  13 +-
 .../org/apache/cassandra/tools/NodeProbe.java   |  28 +--
 .../org/apache/cassandra/tools/NodeTool.java    |   8 +-
 .../apache/cassandra/tools/SSTableImport.java   |   5 +-
 .../cassandra/tools/SSTableMetadataViewer.java  |   1 +
 .../LongLeveledCompactionStrategyTest.java      |   2 +-
 .../cassandra/AbstractSerializationsTester.java |   5 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     |   3 +
 .../LeveledCompactionStrategyTest.java          | 130 +++++++++++--
 .../cassandra/io/sstable/LegacySSTableTest.java |   2 +-
 .../cassandra/io/sstable/SSTableUtils.java      |   3 +-
 .../metadata/MetadataSerializerTest.java        |   2 +-
 .../cassandra/repair/DifferencerTest.java       |  16 +-
 .../apache/cassandra/repair/ValidatorTest.java  |   4 +-
 .../service/AntiEntropyServiceTestAbstract.java |   2 +-
 .../cassandra/service/SerializationsTest.java   |   3 +-
 .../streaming/StreamingTransferTest.java        |   2 +-
 .../cassandra/tools/SSTableExportTest.java      |  15 +-
 60 files changed, 1170 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 89de179..802f515 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,7 @@
  * New counters implementation (CASSANDRA-6504)
  * Replace UnsortedColumns usage with ArrayBackedSortedColumns (CASSANDRA-6630)
  * Add option to use row cache with a given amount of rows (CASSANDRA-5357)
+ * Avoid repairing already repaired data (CASSANDRA-5351)
 
 2.0.6
  * Failure detector correctly converts initial value to nanos (CASSANDRA-6658)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 9dd2ad6..3b3a64a 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -25,6 +25,11 @@ New features
      Note that existing directories are used as is, so only newly created
      directories after upgrade have new directory name format.
    - Saved key cache files also have ColumnFamily ID in their file name.
+   - It is now possible to do incremental repairs, sstables that have been
+     repaired are marked with a timestamp and not included in the next
+     repair session. Use nodetool repair -par -inc to use this feature.
+     A tool to manually mark/unmark sstables as repaired is available in
+     tools/bin/sstablerepairedset.
 
 Upgrading
 ---------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 18f0612..961d126 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -74,6 +74,7 @@ import org.apache.cassandra.io.sstable.metadata.CompactionMetadata;
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.ColumnFamilyMetrics;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 
@@ -1712,6 +1713,32 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return markCurrentViewReferenced().sstables;
     }
 
+    public Set<SSTableReader> getUnrepairedSSTables()
+    {
+        Set<SSTableReader> unRepairedSSTables = new HashSet<>(getSSTables());
+        Iterator<SSTableReader> sstableIterator = unRepairedSSTables.iterator();
+        while(sstableIterator.hasNext())
+        {
+            SSTableReader sstable = sstableIterator.next();
+            if (sstable.isRepaired())
+                sstableIterator.remove();
+        }
+        return unRepairedSSTables;
+    }
+
+    public Set<SSTableReader> getRepairedSSTables()
+    {
+        Set<SSTableReader> repairedSSTables = new HashSet<>(getSSTables());
+        Iterator<SSTableReader> sstableIterator = repairedSSTables.iterator();
+        while(sstableIterator.hasNext())
+        {
+            SSTableReader sstable = sstableIterator.next();
+            if (!sstable.isRepaired())
+                sstableIterator.remove();
+        }
+        return repairedSSTables;
+    }
+
     abstract class AbstractViewSSTableFinder
     {
         abstract List<SSTableReader> findSSTables(DataTracker.View view);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index d90c0ff..e51f380 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -475,6 +475,14 @@ public class DataTracker
             subscriber.handleNotification(notification, this);
     }
 
+    public void notifySSTableRepairedStatusChanged(Collection<SSTableReader> repairStatusesChanged)
+    {
+        INotification notification = new SSTableRepairStatusChanged(repairStatusesChanged);
+        for (INotificationConsumer subscriber : subscribers)
+            subscriber.handleNotification(notification, this);
+
+    }
+
     public void notifyDeleting(SSTableReader deleting)
     {
         INotification notification = new SSTableDeletingNotification(deleting);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 412a0a8..9e76e6f 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.utils.memory.ContextAllocator;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.memory.Pool;
 import org.apache.cassandra.utils.memory.PoolAllocator;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -353,6 +354,7 @@ public class Memtable
 
                 if (writer.getFilePointer() > 0)
                 {
+                    // temp sstables should contain non-repaired data.
                     ssTable = writer.closeAndOpenReader();
                     logger.info(String.format("Completed flushing %s (%d bytes) for commitlog position %s",
                                               ssTable.getFilename(), new File(ssTable.getFilename()).length(), context));
@@ -379,6 +381,7 @@ public class Memtable
             MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
             return new SSTableWriter(filename,
                                      rows.size(),
+                                     ActiveRepairService.UNREPAIRED_SSTABLE,
                                      cfs.metadata,
                                      cfs.partitioner,
                                      sstableMetadataCollector);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index c512097..4efe0a6 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -154,7 +154,7 @@ public abstract class AbstractCompactionStrategy
      *
      * Is responsible for marking its sstables as compaction-pending.
      */
-    public abstract AbstractCompactionTask getMaximalTask(final int gcBefore);
+    public abstract Collection<AbstractCompactionTask> getMaximalTask(final int gcBefore);
 
     /**
      * @param sstables SSTables to compact. Must be marked as compacting.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 3a19ca7..1fc1eda 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -36,7 +36,7 @@ import org.apache.cassandra.utils.AlwaysPresentFilter;
 /**
  * Manage compaction options.
  */
-public class CompactionController
+public class CompactionController implements AutoCloseable
 {
     private static final Logger logger = LoggerFactory.getLogger(CompactionController.class);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index c619b9e..b964c5b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -53,6 +53,7 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.CompactionMetrics;
 import org.apache.cassandra.repair.Validator;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.*;
 
@@ -280,6 +281,80 @@ public class CompactionManager implements CompactionManagerMBean
         });
     }
 
+    public Future<?> submitAntiCompaction(final ColumnFamilyStore cfs,
+                                          final Collection<Range<Token>> ranges,
+                                          final Collection<SSTableReader> validatedForRepair,
+                                          final long repairedAt)
+    {
+        Runnable runnable = new WrappedRunnable() {
+
+            @Override
+            public void runMayThrow() throws Exception
+            {
+                performAnticompaction(cfs, ranges, validatedForRepair, repairedAt);
+            }
+        };
+        return executor.submit(runnable);
+    }
+
+    /**
+     * Make sure the {validatedForRepair} are marked for compaction before calling this.
+     *
+     * @param cfs
+     * @param ranges Ranges that the repair was carried out on
+     * @param validatedForRepair SSTables containing the repaired ranges
+     * @throws InterruptedException, ExecutionException, IOException
+     */
+    public void performAnticompaction(ColumnFamilyStore cfs,
+                                      Collection<Range<Token>> ranges,
+                                      Collection<SSTableReader> validatedForRepair,
+                                      long repairedAt) throws InterruptedException, ExecutionException, IOException
+    {
+        logger.info("Starting anticompaction for ranges {}", ranges);
+        Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
+        Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
+        Set<SSTableReader> nonAnticompacting = new HashSet<>();
+        Iterator<SSTableReader> sstableIterator = sstables.iterator();
+        while (sstableIterator.hasNext())
+        {
+            SSTableReader sstable = sstableIterator.next();
+            for (Range<Token> r : Range.normalize(ranges))
+            {
+                Range<Token> sstableRange = new Range<>(sstable.first.token, sstable.last.token, sstable.partitioner);
+                if (r.contains(sstableRange))
+                {
+                    logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r);
+                    sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt);
+                    sstable.reloadSSTableMetadata();
+                    mutatedRepairStatuses.add(sstable);
+                    sstableIterator.remove();
+                    break;
+                }
+                else if (!sstableRange.intersects(r))
+                {
+                    logger.info("SSTable {} ({}) does not intersect repaired range {}, not touching repairedAt.", sstable, sstableRange, r);
+                    nonAnticompacting.add(sstable);
+                    sstableIterator.remove();
+                }
+                else
+                {
+                    logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable, sstableRange, r);
+                }
+            }
+        }
+        cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
+        cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses));
+        Collection<SSTableReader> antiCompactedSSTables = null;
+        if (!sstables.isEmpty())
+            antiCompactedSSTables = doAntiCompaction(cfs, ranges, sstables, repairedAt);
+        // verify that there are tables to be swapped, otherwise CFS#replaceCompactedSSTables will hang.
+        if (antiCompactedSSTables != null && antiCompactedSSTables.size() > 0)
+            cfs.replaceCompactedSSTables(sstables, antiCompactedSSTables, OperationType.ANTICOMPACTION);
+        SSTableReader.releaseReferences(sstables);
+        cfs.getDataTracker().unmarkCompacting(sstables);
+        logger.info(String.format("Completed anticompaction successfully"));
+    }
+
     public void performMaximal(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
     {
         submitMaximal(cfStore, getDefaultGcBefore(cfStore)).get();
@@ -290,14 +365,15 @@ public class CompactionManager implements CompactionManagerMBean
         // here we compute the task off the compaction executor, so having that present doesn't
         // confuse runWithCompactionsDisabled -- i.e., we don't want to deadlock ourselves, waiting
         // for ourselves to finish/acknowledge cancellation before continuing.
-        final AbstractCompactionTask task = cfStore.getCompactionStrategy().getMaximalTask(gcBefore);
+        final Collection<AbstractCompactionTask> tasks = cfStore.getCompactionStrategy().getMaximalTask(gcBefore);
         Runnable runnable = new WrappedRunnable()
         {
             protected void runMayThrow() throws IOException
             {
-                if (task == null)
+                if (tasks == null)
                     return;
-                task.execute(metrics);
+                for (AbstractCompactionTask task : tasks)
+                    task.execute(metrics);
             }
         };
         return executor.submit(runnable);
@@ -561,6 +637,7 @@ public class CompactionManager implements CompactionManagerMBean
             SSTableWriter writer = createWriter(cfs,
                                                 compactionFileLocation,
                                                 expectedBloomFilterSize,
+                                                sstable.getSSTableMetadata().repairedAt,
                                                 sstable);
             SSTableReader newSstable = null;
             try
@@ -721,11 +798,13 @@ public class CompactionManager implements CompactionManagerMBean
     public static SSTableWriter createWriter(ColumnFamilyStore cfs,
                                              File compactionFileLocation,
                                              int expectedBloomFilterSize,
+                                             long repairedAt,
                                              SSTableReader sstable)
     {
         FileUtils.createDirectory(compactionFileLocation);
         return new SSTableWriter(cfs.getTempSSTablePath(compactionFileLocation),
                                  expectedBloomFilterSize,
+                                 repairedAt,
                                  cfs.metadata,
                                  cfs.partitioner,
                                  new MetadataCollector(Collections.singleton(sstable), cfs.metadata.comparator, sstable.getSSTableLevel()));
@@ -764,10 +843,13 @@ public class CompactionManager implements CompactionManagerMBean
         {
             // flush first so everyone is validating data that is as similar as possible
             StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
-
             // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced
             // instead so they won't be cleaned up if they do get compacted during the validation
-            sstables = cfs.markCurrentSSTablesReferenced();
+            if (validator.desc.parentSessionId == null || ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId) == null)
+                sstables = cfs.markCurrentSSTablesReferenced();
+            else
+                sstables = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId);
+
             if (validator.gcBefore > 0)
                 gcBefore = validator.gcBefore;
             else
@@ -809,6 +891,90 @@ public class CompactionManager implements CompactionManagerMBean
     }
 
     /**
+     * Splits up an sstable into two new sstables. The first of the new tables will store repaired ranges, the second
+     * will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted
+     * and subsequently deleted.
+     * @param cfs
+     * @param repairedSSTables
+     * @param ranges Repaired ranges to be placed into one of the new sstables. The repaired table will be tracked via
+     * the {@link org.apache.cassandra.io.sstable.metadata.StatsMetadata#repairedAt} field.
+     */
+    private Collection<SSTableReader> doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, Collection<SSTableReader> repairedSSTables, long repairedAt)
+    {
+        List<SSTableReader> anticompactedSSTables = new ArrayList<>();
+        int repairedKeyCount = 0;
+        int unrepairedKeyCount = 0;
+        // TODO(5351): we can do better here:
+        int expectedBloomFilterSize = Math.max(cfs.metadata.getIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(repairedSSTables)));
+        logger.info("Performing anticompaction on {} sstables", repairedSSTables.size());
+        // iterate over sstables to check if the repaired / unrepaired ranges intersect them.
+        for (SSTableReader sstable : repairedSSTables)
+        {
+            // check that compaction hasn't stolen any sstables used in previous repair sessions
+            // if we need to skip the anticompaction, it will be carried out by the next repair
+            if (!new File(sstable.getFilename()).exists())
+            {
+                logger.info("Skipping anticompaction for {}, required sstable was compacted and is no longer available.", sstable);
+                continue;
+            }
+
+            logger.info("Anticompacting {}", sstable);
+            File destination = cfs.directories.getDirectoryForNewSSTables();
+            SSTableWriter repairedSSTableWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);
+            SSTableWriter unRepairedSSTableWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable);
+
+            try (CompactionController controller = new CompactionController(cfs, new HashSet<>(Collections.singleton(sstable)), CFMetaData.DEFAULT_GC_GRACE_SECONDS))
+            {
+                AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
+                List<ICompactionScanner> scanners = strategy.getScanners(Arrays.asList(sstable));
+                CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners, controller);
+
+                try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
+                {
+                    while(iter.hasNext())
+                    {
+                        AbstractCompactedRow row = iter.next();
+                        // if current range from sstable is repaired, save it into the new repaired sstable
+                        if (Range.isInRanges(row.key.token, ranges))
+                        {
+                            repairedSSTableWriter.append(row);
+                            repairedKeyCount++;
+                        }
+                        // otherwise save into the new 'non-repaired' table
+                        else
+                        {
+                            unRepairedSSTableWriter.append(row);
+                            unrepairedKeyCount++;
+                        }
+                    }
+                }
+                // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
+                if (repairedKeyCount > 0)
+                    anticompactedSSTables.add(repairedSSTableWriter.closeAndOpenReader(sstable.maxDataAge));
+                else
+                    repairedSSTableWriter.abort();
+                // supply null as we keep SSTableMetadata#repairedAt empty if the table isn't repaired
+                if (unrepairedKeyCount > 0)
+                    anticompactedSSTables.add(unRepairedSSTableWriter.closeAndOpenReader(sstable.maxDataAge));
+                else
+                    unRepairedSSTableWriter.abort();
+            }
+            catch (Throwable e)
+            {
+                logger.error("Error anticompacting " + sstable, e);
+                repairedSSTableWriter.abort();
+                unRepairedSSTableWriter.abort();
+            }
+        }
+        String format = "Repaired {} keys of {} for {}/{}";
+        logger.debug(format, repairedKeyCount, (repairedKeyCount + unrepairedKeyCount), cfs.keyspace, cfs.getColumnFamilyName());
+        String format2 = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";
+        logger.info(format2, repairedSSTables.size(), anticompactedSSTables.size());
+
+        return anticompactedSSTables;
+    }
+
+    /**
      * Is not scheduled, because it is performing disjoint work from sstable compaction.
      */
     public Future<?> submitIndexBuild(final SecondaryIndexBuilder builder)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 1bb5df8..84b22d3 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.CloseableIterator;
 
 public class CompactionTask extends AbstractCompactionTask
@@ -99,6 +100,9 @@ public class CompactionTask extends AbstractCompactionTask
         // it is not empty, it may compact down to nothing if all rows are deleted.
         assert sstables != null && sstableDirectory != null;
 
+        if (toCompact.size() == 0)
+            return;
+
         // Note that the current compaction strategy, is not necessarily the one this task was created under.
         // This should be harmless; see comments to CFS.maybeReloadCompactionStrategy.
         AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
@@ -143,7 +147,7 @@ public class CompactionTask extends AbstractCompactionTask
 
         Collection<SSTableReader> sstables = new ArrayList<>();
         Collection<SSTableWriter> writers = new ArrayList<>();
-
+        long minRepairedAt = getMinRepairedAt(actuallyCompact);
         if (collector != null)
             collector.beginCompaction(ci);
         try
@@ -157,7 +161,7 @@ public class CompactionTask extends AbstractCompactionTask
                 return;
             }
 
-            SSTableWriter writer = createCompactionWriter(sstableDirectory, keysPerSSTable);
+            SSTableWriter writer = createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt);
             writers.add(writer);
             while (iter.hasNext())
             {
@@ -191,7 +195,7 @@ public class CompactionTask extends AbstractCompactionTask
                 {
                     // tmp = false because later we want to query it with descriptor from SSTableReader
                     cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
-                    writer = createCompactionWriter(sstableDirectory, keysPerSSTable);
+                    writer = createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt);
                     writers.add(writer);
                     cachedKeys = new HashMap<>();
                 }
@@ -286,10 +290,21 @@ public class CompactionTask extends AbstractCompactionTask
         logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
     }
 
-    private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable)
+    private long getMinRepairedAt(Set<SSTableReader> actuallyCompact)
+    {
+        long minRepairedAt= Long.MAX_VALUE;
+        for (SSTableReader sstable : actuallyCompact)
+            minRepairedAt = Math.min(minRepairedAt, sstable.getSSTableMetadata().repairedAt);
+        if (minRepairedAt == Long.MAX_VALUE)
+            return ActiveRepairService.UNREPAIRED_SSTABLE;
+        return minRepairedAt;
+    }
+
+    private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable, long repairedAt)
     {
         return new SSTableWriter(cfs.getTempSSTablePath(sstableDirectory),
                                  keysPerSSTable,
+                                 repairedAt,
                                  cfs.metadata,
                                  cfs.partitioner,
                                  new MetadataCollector(toCompact, cfs.metadata.comparator, getLevel()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index d7c4f9f..590e8a6 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.notifications.INotification;
 import org.apache.cassandra.notifications.INotificationConsumer;
 import org.apache.cassandra.notifications.SSTableAddedNotification;
 import org.apache.cassandra.notifications.SSTableListChangedNotification;
+import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
 
 public class LeveledCompactionStrategy extends AbstractCompactionStrategy implements INotificationConsumer
 {
@@ -108,11 +109,13 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
     {
         if (!isEnabled())
             return null;
-
-        return getMaximalTask(gcBefore);
+        Collection<AbstractCompactionTask> tasks = getMaximalTask(gcBefore);
+        if (tasks == null || tasks.size() == 0)
+            return null;
+        return tasks.iterator().next();
     }
 
-    public AbstractCompactionTask getMaximalTask(int gcBefore)
+    public Collection<AbstractCompactionTask> getMaximalTask(int gcBefore)
     {
         while (true)
         {
@@ -141,7 +144,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
             {
                 LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, candidate.sstables, candidate.level, gcBefore, candidate.maxSSTableBytes);
                 newTask.setCompactionType(op);
-                return newTask;
+                return Arrays.<AbstractCompactionTask>asList(newTask);
             }
         }
     }
@@ -168,6 +171,10 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
             SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
             manifest.replace(listChangedNotification.removed, listChangedNotification.added);
         }
+        else if (notification instanceof SSTableRepairStatusChanged)
+        {
+            manifest.repairStatusChanged(((SSTableRepairStatusChanged) notification).sstable);
+        }
     }
 
     public long getMaxSSTableBytes()
@@ -179,7 +186,12 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
     {
         Multimap<Integer, SSTableReader> byLevel = ArrayListMultimap.create();
         for (SSTableReader sstable : sstables)
-            byLevel.get(sstable.getSSTableLevel()).add(sstable);
+        {
+            if (manifest.hasRepairedData() && !sstable.isRepaired())
+                byLevel.get(0).add(sstable);
+            else
+                byLevel.get(sstable.getSSTableLevel()).add(sstable);
+        }
 
         List<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>(sstables.size());
         for (Integer level : byLevel.keySet())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index c8459c9..cab726d 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -50,14 +50,19 @@ public class LeveledManifest
     private static final int MAX_COMPACTING_L0 = 32;
 
     private final ColumnFamilyStore cfs;
-    private final List<SSTableReader>[] generations;
+    @VisibleForTesting
+    protected final List<SSTableReader>[] generations;
+    @VisibleForTesting
+    protected final List<SSTableReader> unrepairedL0;
     private final RowPosition[] lastCompactedKeys;
     private final int maxSSTableSizeInBytes;
     private final SizeTieredCompactionStrategyOptions options;
+    private boolean hasRepairedData = false;
 
     private LeveledManifest(ColumnFamilyStore cfs, int maxSSTableSizeInMB, SizeTieredCompactionStrategyOptions options)
     {
         this.cfs = cfs;
+        this.hasRepairedData = cfs.getRepairedSSTables().size() > 0;
         this.maxSSTableSizeInBytes = maxSSTableSizeInMB * 1024 * 1024;
         this.options = options;
 
@@ -69,9 +74,10 @@ public class LeveledManifest
         lastCompactedKeys = new RowPosition[n];
         for (int i = 0; i < generations.length; i++)
         {
-            generations[i] = new ArrayList<SSTableReader>();
+            generations[i] = new ArrayList<>();
             lastCompactedKeys[i] = cfs.partitioner.getMinimumToken().minKeyBound();
         }
+        unrepairedL0 = new ArrayList<>();
     }
 
     public static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize, List<SSTableReader> sstables)
@@ -97,12 +103,74 @@ public class LeveledManifest
 
     public synchronized void add(SSTableReader reader)
     {
+        if (!hasRepairedData && reader.isRepaired())
+        {
+            // this is the first repaired sstable we get - we need to
+            // rebuild the entire manifest, unrepaired data should be
+            // in unrepairedL0. Note that we keep the sstable level in
+            // the sstable metadata since we are likely to be able to
+            // re-add it at a good level later (during anticompaction
+            // for example).
+            hasRepairedData = true;
+            rebuildManifestAfterFirstRepair();
+        }
+
         int level = reader.getSSTableLevel();
-        assert level < generations.length : "Invalid level " + level + " out of " + (generations.length - 1);
-        logDistribution();
+        if (hasRepairedData && !reader.isRepaired())
+        {
+            logger.debug("Adding unrepaired {} to unrepaired L0", reader);
+            unrepairedL0.add(reader);
+        }
+        else
+        {
+            assert level < generations.length : "Invalid level " + level + " out of " + (generations.length - 1);
+            logDistribution();
+            if (canAddSSTable(reader))
+            {
+                // adding the sstable does not cause overlap in the level
+                logger.debug("Adding {} to L{}", reader, level);
+                generations[level].add(reader);
+            }
+            else
+            {
+                // this can happen if:
+                // * a compaction has promoted an overlapping sstable to the given level, or
+                // * we promote a non-repaired sstable to repaired at level > 0, but an ongoing compaction
+                //   was also supposed to add an sstable at the given level.
+                //
+                // The add(..):ed sstable will be sent to level 0
+                try
+                {
+                    reader.descriptor.getMetadataSerializer().mutateLevel(reader.descriptor, 0);
+                    reader.reloadSSTableMetadata();
+                }
+                catch (IOException e)
+                {
+                    logger.error("Could not change sstable level - adding it at level 0 anyway, we will find it at restart.", e);
+                }
+                generations[0].add(reader);
+            }
+        }
+
+    }
+
 
-        logger.debug("Adding {} to L{}", reader, level);
-        generations[level].add(reader);
+    /**
+     * Since we run standard LCS when we have no repaired data
+     * we need to move all sstables from the leveling
+     * to unrepairedL0.
+     */
+    private void rebuildManifestAfterFirstRepair()
+    {
+        for (int i = 1; i < getAllLevelSize().length; i++)
+        {
+
+            for (SSTableReader sstable : getLevel(i))
+            {
+                generations[i] = new ArrayList<>();
+                add(sstable);
+            }
+        }
     }
 
     /**
@@ -115,7 +183,7 @@ public class LeveledManifest
         // This is needed since we need to decide before the actual compaction what level they will be in.
         // This should be safe, we might skip levels where the compacted data could have fit but that should be ok.
         while (maxBytesForLevel(newLevel) < SSTableReader.getTotalBytes(added)
-               && generations[(newLevel + 1)].isEmpty())
+               && getLevel(newLevel + 1).isEmpty())
         {
             newLevel++;
         }
@@ -178,6 +246,31 @@ public class LeveledManifest
         }
     }
 
+    /**
+     * Checks if adding the sstable creates an overlap in the level
+     * @param sstable the sstable to add
+     * @return true if it is safe to add the sstable in the level.
+     */
+    private boolean canAddSSTable(SSTableReader sstable)
+    {
+        int level = sstable.getSSTableLevel();
+        if (level == 0)
+            return true;
+
+        List<SSTableReader> copyLevel = new ArrayList<>(generations[level]);
+        copyLevel.add(sstable);
+        Collections.sort(copyLevel, SSTableReader.sstableComparator);
+
+        SSTableReader previous = null;
+        for (SSTableReader current : copyLevel)
+        {
+            if (previous != null && current.first.compareTo(previous.last) <= 0)
+                return false;
+            previous = current;
+        }
+        return true;
+    }
+
     private synchronized void sendBackToL0(SSTableReader sstable)
     {
         remove(sstable);
@@ -193,6 +286,15 @@ public class LeveledManifest
         }
     }
 
+    public synchronized void repairStatusChanged(Collection<SSTableReader> sstables)
+    {
+        for(SSTableReader sstable : sstables)
+        {
+            remove(sstable);
+            add(sstable);
+        }
+    }
+
     private String toString(Collection<SSTableReader> sstables)
     {
         StringBuilder builder = new StringBuilder();
@@ -225,6 +327,18 @@ public class LeveledManifest
      */
     public synchronized CompactionCandidate getCompactionCandidates()
     {
+        // if we don't have any repaired data, continue as usual
+        if (hasRepairedData)
+        {
+            Collection<SSTableReader> unrepairedMostInterresting = getSSTablesForSTCS(unrepairedL0);
+            if (!unrepairedMostInterresting.isEmpty())
+            {
+                logger.info("Unrepaired data is most interresting, compacting {} sstables with STCS", unrepairedMostInterresting.size());
+                for (SSTableReader reader : unrepairedMostInterresting)
+                    assert !reader.isRepaired();
+                return new CompactionCandidate(unrepairedMostInterresting, 0, Long.MAX_VALUE);
+            }
+        }
         // LevelDB gives each level a score of how much data it contains vs its ideal amount, and
         // compacts the level with the highest score. But this falls apart spectacularly once you
         // get behind.  Consider this set of levels:
@@ -254,7 +368,7 @@ public class LeveledManifest
         // it can help a lot.
         for (int i = generations.length - 1; i > 0; i--)
         {
-            List<SSTableReader> sstables = generations[i];
+            List<SSTableReader> sstables = getLevel(i);
             if (sstables.isEmpty())
                 continue; // mostly this just avoids polluting the debug log with zero scores
             // we want to calculate score excluding compacting ones
@@ -266,15 +380,9 @@ public class LeveledManifest
             if (score > 1.001)
             {
                 // before proceeding with a higher level, let's see if L0 is far enough behind to warrant STCS
-                if (generations[0].size() > MAX_COMPACTING_L0)
+                if (getLevel(0).size() > MAX_COMPACTING_L0)
                 {
-                    Iterable<SSTableReader> candidates = cfs.getDataTracker().getUncompactingSSTables(generations[0]);
-                    List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(AbstractCompactionStrategy.filterSuspectSSTables(candidates));
-                    List<List<SSTableReader>> buckets = SizeTieredCompactionStrategy.getBuckets(pairs,
-                                                                                                options.bucketHigh,
-                                                                                                options.bucketLow,
-                                                                                                options.minSSTableSize);
-                    List<SSTableReader> mostInteresting = SizeTieredCompactionStrategy.mostInterestingBucket(buckets, 4, 32);
+                    List<SSTableReader> mostInteresting = getSSTablesForSTCS(getLevel(0));
                     if (!mostInteresting.isEmpty())
                     {
                         logger.debug("L0 is too far behind, performing size-tiering there first");
@@ -292,7 +400,7 @@ public class LeveledManifest
         }
 
         // Higher levels are happy, time for a standard, non-STCS L0 compaction
-        if (generations[0].isEmpty())
+        if (getLevel(0).isEmpty())
             return null;
         Collection<SSTableReader> candidates = getCandidatesFor(0);
         if (candidates.isEmpty())
@@ -300,18 +408,29 @@ public class LeveledManifest
         return new CompactionCandidate(candidates, getNextLevel(candidates), cfs.getCompactionStrategy().getMaxSSTableBytes());
     }
 
+    private List<SSTableReader> getSSTablesForSTCS(Collection<SSTableReader> sstables)
+    {
+        Iterable<SSTableReader> candidates = cfs.getDataTracker().getUncompactingSSTables(sstables);
+        List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(AbstractCompactionStrategy.filterSuspectSSTables(candidates));
+        List<List<SSTableReader>> buckets = SizeTieredCompactionStrategy.getBuckets(pairs,
+                                                                                    options.bucketHigh,
+                                                                                    options.bucketLow,
+                                                                                    options.minSSTableSize);
+        return SizeTieredCompactionStrategy.mostInterestingBucket(buckets, 4, 32);
+    }
+
     public synchronized int getLevelSize(int i)
     {
         if (i >= generations.length)
             throw new ArrayIndexOutOfBoundsException("Maximum valid generation is " + (generations.length - 1));
-        return generations[i].size();
+        return getLevel(i).size();
     }
 
     public synchronized int[] getAllLevelSize()
     {
         int[] counts = new int[generations.length];
         for (int i = 0; i < counts.length; i++)
-            counts[i] = generations[i].size();
+            counts[i] = getLevel(i).size();
         return counts;
     }
 
@@ -321,10 +440,10 @@ public class LeveledManifest
         {
             for (int i = 0; i < generations.length; i++)
             {
-                if (!generations[i].isEmpty())
+                if (!getLevel(i).isEmpty())
                 {
                     logger.debug("L{} contains {} SSTables ({} bytes) in {}",
-                                 i, generations[i].size(), SSTableReader.getTotalBytes(generations[i]), this);
+                                 i, getLevel(i).size(), SSTableReader.getTotalBytes(getLevel(i)), this);
                 }
             }
         }
@@ -336,6 +455,7 @@ public class LeveledManifest
         int level = reader.getSSTableLevel();
         assert level >= 0 : reader + " not present in manifest: "+level;
         generations[level].remove(reader);
+        unrepairedL0.remove(reader);
         return level;
     }
 
@@ -404,14 +524,14 @@ public class LeveledManifest
      */
     private Collection<SSTableReader> getCandidatesFor(int level)
     {
-        assert !generations[level].isEmpty();
+        assert !getLevel(level).isEmpty();
         logger.debug("Choosing candidates for L{}", level);
 
         final Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting();
 
         if (level == 0)
         {
-            Set<SSTableReader> compactingL0 = ImmutableSet.copyOf(Iterables.filter(generations[0], Predicates.in(compacting)));
+            Set<SSTableReader> compactingL0 = ImmutableSet.copyOf(Iterables.filter(getLevel(0), Predicates.in(compacting)));
 
             // L0 is the dumping ground for new sstables which thus may overlap each other.
             //
@@ -428,7 +548,7 @@ public class LeveledManifest
             // So if an L1 sstable is suspect we can't do much besides try anyway and hope for the best.
             Set<SSTableReader> candidates = new HashSet<SSTableReader>();
             Set<SSTableReader> remaining = new HashSet<SSTableReader>();
-            Iterables.addAll(remaining, Iterables.filter(generations[0], Predicates.not(suspectP)));
+            Iterables.addAll(remaining, Iterables.filter(getLevel(0), Predicates.not(suspectP)));
             for (SSTableReader sstable : ageSortedSSTables(remaining))
             {
                 if (candidates.contains(sstable))
@@ -447,7 +567,7 @@ public class LeveledManifest
                 if (candidates.size() > MAX_COMPACTING_L0)
                 {
                     // limit to only the MAX_COMPACTING_L0 oldest candidates
-                    candidates = new HashSet<SSTableReader>(ageSortedSSTables(candidates).subList(0, MAX_COMPACTING_L0));
+                    candidates = new HashSet<>(ageSortedSSTables(candidates).subList(0, MAX_COMPACTING_L0));
                     break;
                 }
             }
@@ -458,7 +578,7 @@ public class LeveledManifest
                 // add sstables from L1 that overlap candidates
                 // if the overlapping ones are already busy in a compaction, leave it out.
                 // TODO try to find a set of L0 sstables that only overlaps with non-busy L1 sstables
-                candidates = Sets.union(candidates, overlapping(candidates, generations[1]));
+                candidates = Sets.union(candidates, overlapping(candidates, getLevel(1)));
             }
             if (candidates.size() < 2)
                 return Collections.emptyList();
@@ -467,11 +587,11 @@ public class LeveledManifest
         }
 
         // for non-L0 compactions, pick up where we left off last time
-        Collections.sort(generations[level], SSTableReader.sstableComparator);
+        Collections.sort(getLevel(level), SSTableReader.sstableComparator);
         int start = 0; // handles case where the prior compaction touched the very last range
-        for (int i = 0; i < generations[level].size(); i++)
+        for (int i = 0; i < getLevel(level).size(); i++)
         {
-            SSTableReader sstable = generations[level].get(i);
+            SSTableReader sstable = getLevel(level).get(i);
             if (sstable.first.compareTo(lastCompactedKeys[level]) > 0)
             {
                 start = i;
@@ -481,10 +601,10 @@ public class LeveledManifest
 
         // look for a non-suspect keyspace to compact with, starting with where we left off last time,
         // and wrapping back to the beginning of the generation if necessary
-        for (int i = 0; i < generations[level].size(); i++)
+        for (int i = 0; i < getLevel(level).size(); i++)
         {
-            SSTableReader sstable = generations[level].get((start + i) % generations[level].size());
-            Set<SSTableReader> candidates = Sets.union(Collections.singleton(sstable), overlapping(sstable, generations[level + 1]));
+            SSTableReader sstable = getLevel(level).get((start + i) % getLevel(level).size());
+            Set<SSTableReader> candidates = Sets.union(Collections.singleton(sstable), overlapping(sstable, getLevel(level + 1)));
             if (Iterables.any(candidates, suspectP))
                 continue;
             if (Sets.intersection(candidates, compacting).isEmpty())
@@ -512,7 +632,7 @@ public class LeveledManifest
     {
         for (int i = generations.length - 1; i >= 0; i--)
         {
-            if (generations[i].size() > 0)
+            if (getLevel(i).size() > 0)
                 return i;
         }
         return 0;
@@ -520,7 +640,7 @@ public class LeveledManifest
 
     public synchronized SortedSet<SSTableReader> getLevelSorted(int level, Comparator<SSTableReader> comparator)
     {
-        return ImmutableSortedSet.copyOf(comparator, generations[level]);
+        return ImmutableSortedSet.copyOf(comparator, getLevel(level));
     }
 
     public List<SSTableReader> getLevel(int i)
@@ -535,7 +655,7 @@ public class LeveledManifest
 
         for (int i = generations.length - 1; i >= 0; i--)
         {
-            List<SSTableReader> sstables = generations[i];
+            List<SSTableReader> sstables = getLevel(i);
             estimated[i] = Math.max(0L, SSTableReader.getTotalBytes(sstables) - maxBytesForLevel(i)) / maxSSTableSizeInBytes;
             tasks += estimated[i];
         }
@@ -570,6 +690,11 @@ public class LeveledManifest
 
     }
 
+    public boolean hasRepairedData()
+    {
+        return hasRepairedData;
+    }
+
     public static class CompactionCandidate
     {
         public final Collection<SSTableReader> sstables;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index 2416ed1..df4fd96 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -29,7 +29,8 @@ public enum OperationType
     INDEX_BUILD("Secondary index build"),
     /** Compaction for tombstone removal */
     TOMBSTONE_COMPACTION("Tombstone Compaction"),
-    UNKNOWN("Unknown compaction type");
+    UNKNOWN("Unknown compaction type"),
+    ANTICOMPACTION("Anticompaction after repair");
 
     private final String type;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 666b933..32bef04 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.OutputHandler;
 
@@ -113,7 +114,7 @@ public class Scrubber implements Closeable
             }
 
             // TODO errors when creating the writer may leave empty temp files.
-            writer = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
+            writer = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable);
 
             DecoratedKey prevKey = null;
 
@@ -271,7 +272,7 @@ public class Scrubber implements Closeable
 
         if (!outOfOrderRows.isEmpty())
         {
-            SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
+            SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable);
             for (Row row : outOfOrderRows)
                 inOrderWriter.append(row.key, row.cf);
             newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index d1fe6a1..63d983c 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -23,6 +23,7 @@ import java.util.Map.Entry;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,6 +31,7 @@ import org.apache.cassandra.cql3.statements.CFPropDefs;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.Pair;
 
 public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
@@ -79,6 +81,15 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
 
         Iterable<SSTableReader> candidates = filterSuspectSSTables(cfs.getUncompactingSSTables());
         candidates = filterColdSSTables(Lists.newArrayList(candidates), options.coldReadsToOmit);
+        Pair<Set<SSTableReader>,Set<SSTableReader>> repairedUnrepaired = splitInRepairedAndUnrepaired(candidates);
+        if (repairedUnrepaired.left.size() > repairedUnrepaired.right.size())
+        {
+            candidates = repairedUnrepaired.left;
+        }
+        else
+        {
+            candidates = repairedUnrepaired.right;
+        }
 
         List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(candidates), options.bucketHigh, options.bucketLow, options.minSSTableSize);
         logger.debug("Compaction buckets are {}", buckets);
@@ -89,7 +100,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
 
         // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
         // ratio is greater than threshold.
-        List<SSTableReader> sstablesWithTombstones = new ArrayList<SSTableReader>();
+        List<SSTableReader> sstablesWithTombstones = new ArrayList<>();
         for (SSTableReader sstable : candidates)
         {
             if (worthDroppingTombstones(sstable, gcBefore))
@@ -102,6 +113,20 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         return Collections.singletonList(sstablesWithTombstones.get(0));
     }
 
+    private static Pair<Set<SSTableReader>, Set<SSTableReader>> splitInRepairedAndUnrepaired(Iterable<SSTableReader> candidates)
+    {
+        Set<SSTableReader> repaired = new HashSet<>();
+        Set<SSTableReader> unRepaired = new HashSet<>();
+        for(SSTableReader candidate : candidates)
+        {
+            if (!candidate.isRepaired())
+                unRepaired.add(candidate);
+            else
+                repaired.add(candidate);
+        }
+        return Pair.create(repaired, unRepaired);
+    }
+
     /**
      * Removes as many cold sstables as possible while retaining at least 1-coldReadsToOmit of the total reads/sec
      * across all sstables
@@ -249,13 +274,22 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         }
     }
 
-    public AbstractCompactionTask getMaximalTask(final int gcBefore)
+    public Collection<AbstractCompactionTask> getMaximalTask(final int gcBefore)
     {
-        Iterable<SSTableReader> sstables = cfs.markAllCompacting();
-        if (sstables == null)
+        Iterable<SSTableReader> allSSTables = cfs.markAllCompacting();
+        if (allSSTables == null)
             return null;
-
-        return new CompactionTask(cfs, sstables, gcBefore);
+        Set<SSTableReader> sstables = Sets.newHashSet(allSSTables);
+        Set<SSTableReader> repaired = new HashSet<>();
+        Set<SSTableReader> unrepaired = new HashSet<>();
+        for (SSTableReader sstable : sstables)
+        {
+            if (sstable.isRepaired())
+                repaired.add(sstable);
+            else
+                unrepaired.add(sstable);
+        }
+        return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, repaired, gcBefore), new CompactionTask(cfs, unrepaired, gcBefore));
     }
 
     public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index de96668..022a3c9 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -61,7 +61,7 @@ public class Upgrader
         this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
     }
 
-    private SSTableWriter createCompactionWriter()
+    private SSTableWriter createCompactionWriter(long repairedAt)
     {
         MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.getComparator());
 
@@ -77,7 +77,7 @@ public class Upgrader
             }
         }
 
-        return new SSTableWriter(cfs.getTempSSTablePath(directory), estimatedRows, cfs.metadata, cfs.partitioner, sstableMetadataCollector);
+        return new SSTableWriter(cfs.getTempSSTablePath(directory), estimatedRows, repairedAt, cfs.metadata, cfs.partitioner, sstableMetadataCollector);
     }
 
     public void upgrade()
@@ -94,7 +94,7 @@ public class Upgrader
 
         try
         {
-            SSTableWriter writer = createCompactionWriter();
+            SSTableWriter writer = createCompactionWriter(sstable.getSSTableMetadata().repairedAt);
             writers.add(writer);
             while (iter.hasNext())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 303f73c..fc37915 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.CounterId;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.memory.HeapAllocator;
@@ -55,6 +56,7 @@ public abstract class AbstractSSTableSimpleWriter
         return new SSTableWriter(
             makeFilename(directory, metadata.ksName, metadata.cfName),
             0, // We don't care about the bloom filter
+            ActiveRepairService.UNREPAIRED_SSTABLE,
             metadata,
             DatabaseDescriptor.getPartitioner(),
             new MetadataCollector(metadata.comparator));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index d026d6c..a2a27d8 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -77,6 +77,7 @@ public class Descriptor
         public final boolean hasPostCompressionAdlerChecksums;
         public final boolean hasSamplingLevel;
         public final boolean newStatsFile;
+        public final boolean hasRepairedAt;
 
         public Version(String version)
         {
@@ -91,6 +92,7 @@ public class Descriptor
             hasPostCompressionAdlerChecksums = version.compareTo("jb") >= 0;
             hasSamplingLevel = version.compareTo("ka") >= 0;
             newStatsFile = version.compareTo("ka") >= 0;
+            hasRepairedAt = version.compareTo("ka") >= 0;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index f867317..587bf0a 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.OutputHandler;
@@ -122,7 +123,7 @@ public class SSTableLoader implements StreamEventHandler
                         List<Pair<Long, Long>> sstableSections = sstable.getPositionsForRanges(tokenRanges);
                         long estimatedKeys = sstable.estimatedKeysForRanges(tokenRanges);
 
-                        StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(sstable, sstableSections, estimatedKeys);
+                        StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(sstable, sstableSections, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE);
                         streamingDetails.put(endpoint, details);
                     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 53c315d..ffb7be1 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.metadata.*;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.metrics.RestorableMeter;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.tracing.Tracing;
@@ -1481,6 +1482,11 @@ public class SSTableReader extends SSTable implements Closeable
         }
     }
 
+    public boolean isRepaired()
+    {
+        return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
+    }
+
     /**
      * TODO: Move someplace reusable
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index abc513e..9b50a18 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -58,11 +58,13 @@ public class SSTableWriter extends SSTable
     private DecoratedKey lastWrittenKey;
     private FileMark dataMark;
     private final MetadataCollector sstableMetadataCollector;
+    private final long repairedAt;
 
-    public SSTableWriter(String filename, long keyCount)
+    public SSTableWriter(String filename, long keyCount, long repairedAt)
     {
         this(filename,
              keyCount,
+             repairedAt,
              Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)),
              StorageService.getPartitioner(),
              new MetadataCollector(Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)).comparator));
@@ -95,6 +97,7 @@ public class SSTableWriter extends SSTable
 
     public SSTableWriter(String filename,
                          long keyCount,
+                         long repairedAt,
                          CFMetaData metadata,
                          IPartitioner<?> partitioner,
                          MetadataCollector sstableMetadataCollector)
@@ -103,6 +106,7 @@ public class SSTableWriter extends SSTable
               components(metadata),
               metadata,
               partitioner);
+        this.repairedAt = repairedAt;
         iwriter = new IndexWriter(keyCount);
 
         if (compression)
@@ -362,7 +366,8 @@ public class SSTableWriter extends SSTable
         // write sstable statistics
         Map<MetadataType, MetadataComponent> metadataComponents = sstableMetadataCollector.finalizeMetadata(
                                                                                     partitioner.getClass().getCanonicalName(),
-                                                                                    metadata.getBloomFilterFpChance());
+                                                                                    metadata.getBloomFilterFpChance(),
+                                                                                    repairedAt);
         writeMetadata(descriptor, metadataComponents);
 
         // save the table of components

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
index bd953ae..95fc627 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
@@ -64,4 +64,9 @@ public interface IMetadataSerializer
      * @throws IOException
      */
     void mutateLevel(Descriptor descriptor, int newLevel) throws IOException;
+
+    /**
+     * Mutate repairedAt time
+     */
+    void mutateRepairedAt(Descriptor descriptor, long newRepairedAt) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
index 33d4f16..01464e4 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
@@ -26,6 +26,7 @@ import com.google.common.collect.Maps;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.EstimatedHistogram;
 import org.apache.cassandra.utils.StreamingHistogram;
@@ -145,7 +146,8 @@ public class LegacyMetadataSerializer extends MetadataSerializer
                                                      tombstoneHistogram,
                                                      sstableLevel,
                                                      minColumnNames,
-                                                     maxColumnNames));
+                                                     maxColumnNames,
+                                                     ActiveRepairService.UNREPAIRED_SSTABLE));
                 if (types.contains(MetadataType.COMPACTION))
                     components.put(MetadataType.COMPACTION,
                                    new CompactionMetadata(ancestors, null));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index e20015d..84c35c7 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Maps;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.EstimatedHistogram;
 import org.apache.cassandra.utils.MurmurHash;
 import org.apache.cassandra.utils.StreamingHistogram;
@@ -65,7 +66,8 @@ public class MetadataCollector
                                  defaultTombstoneDropTimeHistogram(),
                                  0,
                                  Collections.<ByteBuffer>emptyList(),
-                                 Collections.<ByteBuffer>emptyList());
+                                 Collections.<ByteBuffer>emptyList(),
+                                 ActiveRepairService.UNREPAIRED_SSTABLE);
     }
 
     protected EstimatedHistogram estimatedRowSize = defaultRowSizeHistogram();
@@ -214,7 +216,7 @@ public class MetadataCollector
         return this;
     }
 
-    public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance)
+    public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt)
     {
         Map<MetadataType, MetadataComponent> components = Maps.newHashMap();
         components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance));
@@ -228,7 +230,8 @@ public class MetadataCollector
                                                              estimatedTombstoneDropTime,
                                                              sstableLevel,
                                                              minColumnNames,
-                                                             maxColumnNames));
+                                                             maxColumnNames,
+                                                             repairedAt));
         components.put(MetadataType.COMPACTION, new CompactionMetadata(ancestors, cardinality));
         return components;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
index d7962de..32a133a 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -129,6 +129,21 @@ public class MetadataSerializer implements IMetadataSerializer
         StatsMetadata stats = (StatsMetadata) currentComponents.remove(MetadataType.STATS);
         // mutate level
         currentComponents.put(MetadataType.STATS, stats.mutateLevel(newLevel));
+        rewriteSSTableMetadata(descriptor, currentComponents);
+    }
+
+    public void mutateRepairedAt(Descriptor descriptor, long newRepairedAt) throws IOException
+    {
+        logger.debug("Mutating {} to repairedAt time {}", descriptor.filenameFor(Component.STATS), newRepairedAt);
+        Map<MetadataType, MetadataComponent> currentComponents = deserialize(descriptor, EnumSet.allOf(MetadataType.class));
+        StatsMetadata stats = (StatsMetadata) currentComponents.remove(MetadataType.STATS);
+        // mutate level
+        currentComponents.put(MetadataType.STATS, stats.mutateRepairedAt(newRepairedAt));
+        rewriteSSTableMetadata(descriptor, currentComponents);
+    }
+
+    private void rewriteSSTableMetadata(Descriptor descriptor, Map<MetadataType, MetadataComponent> currentComponents) throws IOException
+    {
         Descriptor tmpDescriptor = descriptor.asTemporary(true);
 
         try (DataOutputStream out = new DataOutputStream(new FileOutputStream(tmpDescriptor.filenameFor(Component.STATS))))
@@ -140,5 +155,6 @@ public class MetadataSerializer implements IMetadataSerializer
         if (!FBUtilities.isUnix())
             FileUtils.delete(descriptor.filenameFor(Component.STATS));
         FileUtils.renameWithConfirm(tmpDescriptor.filenameFor(Component.STATS), descriptor.filenameFor(Component.STATS));
+
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index 8055c77..cd8a529 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -53,6 +53,7 @@ public class StatsMetadata extends MetadataComponent
     public final int sstableLevel;
     public final List<ByteBuffer> maxColumnNames;
     public final List<ByteBuffer> minColumnNames;
+    public final long repairedAt;
 
     public StatsMetadata(EstimatedHistogram estimatedRowSize,
                          EstimatedHistogram estimatedColumnCount,
@@ -64,7 +65,8 @@ public class StatsMetadata extends MetadataComponent
                          StreamingHistogram estimatedTombstoneDropTime,
                          int sstableLevel,
                          List<ByteBuffer> minColumnNames,
-                         List<ByteBuffer> maxColumnNames)
+                         List<ByteBuffer> maxColumnNames,
+                         long repairedAt)
     {
         this.estimatedRowSize = estimatedRowSize;
         this.estimatedColumnCount = estimatedColumnCount;
@@ -77,6 +79,7 @@ public class StatsMetadata extends MetadataComponent
         this.sstableLevel = sstableLevel;
         this.minColumnNames = minColumnNames;
         this.maxColumnNames = maxColumnNames;
+        this.repairedAt = repairedAt;
     }
 
     public MetadataType getType()
@@ -120,7 +123,24 @@ public class StatsMetadata extends MetadataComponent
                                  estimatedTombstoneDropTime,
                                  newLevel,
                                  maxColumnNames,
-                                 minColumnNames);
+                                 minColumnNames,
+                                 repairedAt);
+    }
+
+    public StatsMetadata mutateRepairedAt(long newRepairedAt)
+    {
+        return new StatsMetadata(estimatedRowSize,
+                                 estimatedColumnCount,
+                                 replayPosition,
+                                 minTimestamp,
+                                 maxTimestamp,
+                                 maxLocalDeletionTime,
+                                 compressionRatio,
+                                 estimatedTombstoneDropTime,
+                                 sstableLevel,
+                                 maxColumnNames,
+                                 minColumnNames,
+                                 newRepairedAt);
     }
 
     @Override
@@ -140,6 +160,7 @@ public class StatsMetadata extends MetadataComponent
                        .append(compressionRatio, that.compressionRatio)
                        .append(estimatedTombstoneDropTime, that.estimatedTombstoneDropTime)
                        .append(sstableLevel, that.sstableLevel)
+                       .append(repairedAt, that.repairedAt)
                        .append(maxColumnNames, that.maxColumnNames)
                        .append(minColumnNames, that.minColumnNames)
                        .build();
@@ -158,6 +179,7 @@ public class StatsMetadata extends MetadataComponent
                        .append(compressionRatio)
                        .append(estimatedTombstoneDropTime)
                        .append(sstableLevel)
+                       .append(repairedAt)
                        .append(maxColumnNames)
                        .append(minColumnNames)
                        .build();
@@ -171,7 +193,7 @@ public class StatsMetadata extends MetadataComponent
             size += EstimatedHistogram.serializer.serializedSize(component.estimatedRowSize, TypeSizes.NATIVE);
             size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount, TypeSizes.NATIVE);
             size += ReplayPosition.serializer.serializedSize(component.replayPosition, TypeSizes.NATIVE);
-            size += 8 + 8 + 4 + 8; // mix/max timestamp(long), maxLocalDeletionTime(int), compressionRatio(double)
+            size += 8 + 8 + 4 + 8 + 8; // mix/max timestamp(long), maxLocalDeletionTime(int), compressionRatio(double), repairedAt (long)
             size += StreamingHistogram.serializer.serializedSize(component.estimatedTombstoneDropTime, TypeSizes.NATIVE);
             size += TypeSizes.NATIVE.sizeof(component.sstableLevel);
             // min column names
@@ -196,6 +218,7 @@ public class StatsMetadata extends MetadataComponent
             out.writeDouble(component.compressionRatio);
             StreamingHistogram.serializer.serialize(component.estimatedTombstoneDropTime, out);
             out.writeInt(component.sstableLevel);
+            out.writeLong(component.repairedAt);
             out.writeInt(component.minColumnNames.size());
             for (ByteBuffer columnName : component.minColumnNames)
                 ByteBufferUtil.writeWithShortLength(columnName, out);
@@ -215,6 +238,9 @@ public class StatsMetadata extends MetadataComponent
             double compressionRatio = in.readDouble();
             StreamingHistogram tombstoneHistogram = StreamingHistogram.serializer.deserialize(in);
             int sstableLevel = in.readInt();
+            long repairedAt = 0;
+            if (version.hasRepairedAt)
+                repairedAt = in.readLong();
             List<ByteBuffer> minColumnNames;
             List<ByteBuffer> maxColumnNames;
             if (version.tracksMaxMinColumnNames)
@@ -247,7 +273,8 @@ public class StatsMetadata extends MetadataComponent
                                      tombstoneHistogram,
                                      sstableLevel,
                                      minColumnNames,
-                                     maxColumnNames);
+                                     maxColumnNames,
+                                     repairedAt);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 16daf4e..2e6d5c2 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -67,9 +67,9 @@ public class RepairJob
     /**
      * Create repair job to run on specific columnfamily
      */
-    public RepairJob(UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential)
+    public RepairJob(UUID parentSessionId, UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential)
     {
-        this.desc = new RepairJobDesc(sessionId, keyspace, columnFamily, range);
+        this.desc = new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily, range);
         this.isSequential = isSequential;
         this.treeRequests = new RequestCoordinator<InetAddress>(isSequential)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7b72140/src/java/org/apache/cassandra/repair/RepairJobDesc.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJobDesc.java b/src/java/org/apache/cassandra/repair/RepairJobDesc.java
index 596540f..3e911ee 100644
--- a/src/java/org/apache/cassandra/repair/RepairJobDesc.java
+++ b/src/java/org/apache/cassandra/repair/RepairJobDesc.java
@@ -29,6 +29,8 @@ import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 /**
@@ -40,6 +42,7 @@ public class RepairJobDesc
 {
     public static final IVersionedSerializer<RepairJobDesc> serializer = new RepairJobDescSerializer();
 
+    public final UUID parentSessionId;
     /** RepairSession id */
     public final UUID sessionId;
     public final String keyspace;
@@ -47,8 +50,9 @@ public class RepairJobDesc
     /** repairing range  */
     public final Range<Token> range;
 
-    public RepairJobDesc(UUID sessionId, String keyspace, String columnFamily, Range<Token> range)
+    public RepairJobDesc(UUID parentSessionId, UUID sessionId, String keyspace, String columnFamily, Range<Token> range)
     {
+        this.parentSessionId = parentSessionId;
         this.sessionId = sessionId;
         this.keyspace = keyspace;
         this.columnFamily = columnFamily;
@@ -58,13 +62,7 @@ public class RepairJobDesc
     @Override
     public String toString()
     {
-        StringBuilder sb = new StringBuilder("[repair #");
-        sb.append(sessionId);
-        sb.append(" on ");
-        sb.append(keyspace).append("/").append(columnFamily);
-        sb.append(", ").append(range);
-        sb.append("]");
-        return sb.toString();
+        return "[repair #" + sessionId + " on " + keyspace + "/" + columnFamily + ", " + range + "]";
     }
 
     @Override
@@ -79,6 +77,7 @@ public class RepairJobDesc
         if (!keyspace.equals(that.keyspace)) return false;
         if (range != null ? !range.equals(that.range) : that.range != null) return false;
         if (!sessionId.equals(that.sessionId)) return false;
+        if (parentSessionId != null ? !parentSessionId.equals(that.parentSessionId) : that.parentSessionId != null) return false;
 
         return true;
     }
@@ -93,6 +92,12 @@ public class RepairJobDesc
     {
         public void serialize(RepairJobDesc desc, DataOutput out, int version) throws IOException
         {
+            if (version >= MessagingService.VERSION_21)
+            {
+                out.writeBoolean(desc.parentSessionId != null);
+                if (desc.parentSessionId != null)
+                    UUIDSerializer.serializer.serialize(desc.parentSessionId, out, version);
+            }
             UUIDSerializer.serializer.serialize(desc.sessionId, out, version);
             out.writeUTF(desc.keyspace);
             out.writeUTF(desc.columnFamily);
@@ -101,16 +106,28 @@ public class RepairJobDesc
 
         public RepairJobDesc deserialize(DataInput in, int version) throws IOException
         {
+            UUID parentSessionId = null;
+            if (version >= MessagingService.VERSION_21)
+            {
+                if (in.readBoolean())
+                    parentSessionId = UUIDSerializer.serializer.deserialize(in, version);
+            }
             UUID sessionId = UUIDSerializer.serializer.deserialize(in, version);
             String keyspace = in.readUTF();
             String columnFamily = in.readUTF();
             Range<Token> range = (Range<Token>)AbstractBounds.serializer.deserialize(in, version);
-            return new RepairJobDesc(sessionId, keyspace, columnFamily, range);
+            return new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily, range);
         }
 
         public long serializedSize(RepairJobDesc desc, int version)
         {
             int size = 0;
+            if (version >= MessagingService.VERSION_21)
+            {
+                size += TypeSizes.NATIVE.sizeof(desc.parentSessionId != null);
+                if (desc.parentSessionId != null)
+                    size += UUIDSerializer.serializer.serializedSize(desc.parentSessionId, version);
+            }
             size += UUIDSerializer.serializer.serializedSize(desc.sessionId, version);
             size += TypeSizes.NATIVE.sizeof(desc.keyspace);
             size += TypeSizes.NATIVE.sizeof(desc.columnFamily);