You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/09/01 01:35:58 UTC

[06/18] cassandra git commit: Transient Replication and Cheap Quorums

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/CleanupTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java
index a096c78..46c0afd 100644
--- a/test/unit/org/apache/cassandra/db/CleanupTest.java
+++ b/test/unit/org/apache/cassandra/db/CleanupTest.java
@@ -107,7 +107,6 @@ public class CleanupTest
                                     SchemaLoader.compositeIndexCFMD(KEYSPACE2, CF_INDEXED2, true));
     }
 
-    /*
     @Test
     public void testCleanup() throws ExecutionException, InterruptedException
     {
@@ -116,7 +115,6 @@ public class CleanupTest
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
 
-        UnfilteredPartitionIterator iter;
 
         // insert data and verify we get it back w/ range query
         fillCF(cfs, "val", LOOPS);
@@ -124,8 +122,7 @@ public class CleanupTest
         // record max timestamps of the sstables pre-cleanup
         List<Long> expectedMaxTimestamps = getMaxTimestampList(cfs);
 
-        iter = Util.getRangeSlice(cfs);
-        assertEquals(LOOPS, Iterators.size(iter));
+        assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).build()).size());
 
         // with one token in the ring, owned by the local node, cleanup should be a no-op
         CompactionManager.instance.performCleanup(cfs, 2);
@@ -134,10 +131,8 @@ public class CleanupTest
         assert expectedMaxTimestamps.equals(getMaxTimestampList(cfs));
 
         // check data is still there
-        iter = Util.getRangeSlice(cfs);
-        assertEquals(LOOPS, Iterators.size(iter));
+        assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).build()).size());
     }
-    */
 
     @Test
     public void testCleanupWithIndexes() throws IOException, ExecutionException, InterruptedException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/CleanupTransientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CleanupTransientTest.java b/test/unit/org/apache/cassandra/db/CleanupTransientTest.java
new file mode 100644
index 0000000..9789183
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/CleanupTransientTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.AbstractNetworkTopologySnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.PendingRangeCalculatorService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CleanupTransientTest
+{
+    private static final IPartitioner partitioner = RandomPartitioner.instance;
+    private static IPartitioner oldPartitioner;
+
+    public static final int LOOPS = 200;
+    public static final String KEYSPACE1 = "CleanupTest1";
+    public static final String CF_INDEXED1 = "Indexed1";
+    public static final String CF_STANDARD1 = "Standard1";
+
+    public static final String KEYSPACE2 = "CleanupTestMultiDc";
+    public static final String CF_INDEXED2 = "Indexed2";
+    public static final String CF_STANDARD2 = "Standard2";
+
+    public static final ByteBuffer COLUMN = ByteBufferUtil.bytes("birthdate");
+    public static final ByteBuffer VALUE = ByteBuffer.allocate(8);
+    static
+    {
+        VALUE.putLong(20101229);
+        VALUE.flip();
+    }
+
+    @BeforeClass
+    public static void setup() throws Exception
+    {
+        DatabaseDescriptor.daemonInitialization();
+        DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
+        oldPartitioner = StorageService.instance.setPartitionerUnsafe(partitioner);
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    KeyspaceParams.simple("2/1"),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1),
+                                    SchemaLoader.compositeIndexCFMD(KEYSPACE1, CF_INDEXED1, true));
+
+        StorageService ss = StorageService.instance;
+        final int RING_SIZE = 2;
+
+        TokenMetadata tmd = ss.getTokenMetadata();
+        tmd.clearUnsafe();
+        ArrayList<Token> endpointTokens = new ArrayList<>();
+        ArrayList<Token> keyTokens = new ArrayList<>();
+        List<InetAddressAndPort> hosts = new ArrayList<>();
+        List<UUID> hostIds = new ArrayList<>();
+
+        endpointTokens.add(RandomPartitioner.MINIMUM);
+        endpointTokens.add(RandomPartitioner.instance.midpoint(RandomPartitioner.MINIMUM, new RandomPartitioner.BigIntegerToken(RandomPartitioner.MAXIMUM)));
+
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE);
+        PendingRangeCalculatorService.instance.blockUntilFinished();
+
+
+        DatabaseDescriptor.setEndpointSnitch(new AbstractNetworkTopologySnitch()
+        {
+            @Override
+            public String getRack(InetAddressAndPort endpoint)
+            {
+                return "RC1";
+            }
+
+            @Override
+            public String getDatacenter(InetAddressAndPort endpoint)
+            {
+                return "DC1";
+            }
+        });
+    }
+
+    @Test
+    public void testCleanup() throws Exception
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
+
+
+        // insert data and verify we get it back w/ range query
+        fillCF(cfs, "val", LOOPS);
+
+        // record max timestamps of the sstables pre-cleanup
+        List<Long> expectedMaxTimestamps = getMaxTimestampList(cfs);
+
+        assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).build()).size());
+
+        // with two tokens RF=2/1 and the sstable not repaired this should do nothing
+        CompactionManager.instance.performCleanup(cfs, 2);
+
+        // ensure max timestamp of the sstables are retained post-cleanup
+        assert expectedMaxTimestamps.equals(getMaxTimestampList(cfs));
+
+        // check data is still there
+        assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).build()).size());
+
+        //Get an exact count of how many partitions are in the fully replicated range and should
+        //be retained
+        int fullCount = 0;
+        RangesAtEndpoint localRanges = StorageService.instance.getLocalReplicas(keyspace.getName()).filter(Replica::isFull);
+        for (FilteredPartition partition : Util.getAll(Util.cmd(cfs).build()))
+        {
+            Token token = partition.partitionKey().getToken();
+            for (Replica r : localRanges)
+            {
+                if (r.range().contains(token))
+                    fullCount++;
+            }
+        }
+
+        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
+        sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, 1, null, false);
+        sstable.reloadSSTableMetadata();
+
+        // This should remove approximately 50% of the data, specifically whatever was transiently replicated
+        CompactionManager.instance.performCleanup(cfs, 2);
+
+        // ensure max timestamp of the sstables are retained post-cleanup
+        assert expectedMaxTimestamps.equals(getMaxTimestampList(cfs));
+
+        // check less data is there, all transient data should be gone since the table was repaired
+        assertEquals(fullCount, Util.getAll(Util.cmd(cfs).build()).size());
+    }
+
+    protected void fillCF(ColumnFamilyStore cfs, String colName, int rowsPerSSTable)
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        for (int i = 0; i < rowsPerSSTable; i++)
+        {
+            String key = String.valueOf(i);
+            // create a row and update the birthdate value, test that the index query fetches the new version
+            new RowUpdateBuilder(cfs.metadata(), System.currentTimeMillis(), ByteBufferUtil.bytes(key))
+                    .clustering(COLUMN)
+                    .add(colName, VALUE)
+                    .build()
+                    .applyUnsafe();
+        }
+
+        cfs.forceBlockingFlush();
+    }
+
+    protected List<Long> getMaxTimestampList(ColumnFamilyStore cfs)
+    {
+        List<Long> list = new LinkedList<Long>();
+        for (SSTableReader sstable : cfs.getLiveSSTables())
+            list.add(sstable.getMaxTimestamp());
+        return list;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/ImportTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ImportTest.java b/test/unit/org/apache/cassandra/db/ImportTest.java
index 66bbff3..5ceb233 100644
--- a/test/unit/org/apache/cassandra/db/ImportTest.java
+++ b/test/unit/org/apache/cassandra/db/ImportTest.java
@@ -174,7 +174,7 @@ public class ImportTest extends CQLTester
         Set<SSTableReader> sstables = getCurrentColumnFamilyStore().getLiveSSTables();
         getCurrentColumnFamilyStore().clearUnsafe();
         for (SSTableReader sstable : sstables)
-            sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 111, null);
+            sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, 111, null, false);
 
         File backupdir = moveToBackupDir(sstables);
         assertEquals(0, execute("select * from %s").size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java b/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
index e01088d..a864786 100644
--- a/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
+++ b/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
@@ -308,7 +308,7 @@ public class RepairedDataTombstonesTest extends CQLTester
 
     public static void repair(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException
     {
-        sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 1, null);
+        sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, 1, null, false);
         sstable.reloadSSTableMetadata();
         cfs.getTracker().notifySSTableRepairedStatusChanged(Collections.singleton(sstable));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java b/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java
index 706b274..1ac5440 100644
--- a/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java
+++ b/test/unit/org/apache/cassandra/db/RowUpdateBuilder.java
@@ -70,6 +70,12 @@ public class RowUpdateBuilder
         this.updateBuilder.nowInSec(localDeletionTime);
     }
 
+    public RowUpdateBuilder timestamp(long ts)
+    {
+        updateBuilder.timestamp(ts);
+        return this;
+    }
+
     private Row.SimpleBuilder rowBuilder()
     {
         // Normally, rowBuilder is created by the call to clustering(), but we allow skipping that call for an empty

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 36af54f..b58909b 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -615,7 +615,7 @@ public class ScrubTest
     {
         SerializationHeader header = new SerializationHeader(true, metadata.get(), metadata.get().regularAndStaticColumns(), EncodingStats.NO_STATS);
         MetadataCollector collector = new MetadataCollector(metadata.get().comparator).sstableLevel(0);
-        return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, null, metadata, collector, header, txn), txn);
+        return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, null, false, metadata, collector, header, txn), txn);
     }
 
     private static class TestMultiWriter extends SimpleSSTableMultiWriter
@@ -631,10 +631,10 @@ public class ScrubTest
      */
     private static class TestWriter extends BigTableWriter
     {
-        TestWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, TableMetadataRef metadata,
+        TestWriter(Descriptor descriptor, long keyCount, long repairedAt, UUID pendingRepair, boolean isTransient, TableMetadataRef metadata,
                    MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn)
         {
-            super(descriptor, keyCount, repairedAt, pendingRepair, metadata, collector, header, Collections.emptySet(), txn);
+            super(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, collector, header, Collections.emptySet(), txn);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java b/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java
index 1c051f5..a14db00 100644
--- a/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java
+++ b/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java
@@ -34,6 +34,8 @@ import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.utils.UUIDGen;
 
 import static org.junit.Assert.assertEquals;
@@ -189,4 +191,28 @@ public class SystemKeyspaceMigrator40Test extends CQLTester
         }
         assertEquals(1, rowCount);
     }
+
+    @Test
+    public void testMigrateAvailableRanges() throws Throwable
+    {
+        Range<Token> testRange = new Range<>(DatabaseDescriptor.getPartitioner().getRandomToken(), DatabaseDescriptor.getPartitioner().getRandomToken());
+        String insert = String.format("INSERT INTO %s ("
+                                      + "keyspace_name, "
+                                      + "ranges) "
+                                      + " values ( ?, ? )",
+                                      SystemKeyspaceMigrator40.legacyAvailableRangesName);
+        execute(insert,
+                "foo",
+                ImmutableSet.of(SystemKeyspace.rangeToBytes(testRange)));
+        SystemKeyspaceMigrator40.migrate();
+
+        int rowCount = 0;
+        for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", SystemKeyspaceMigrator40.availableRangesName)))
+        {
+            rowCount++;
+            assertEquals("foo", row.getString("keyspace_name"));
+            assertEquals(ImmutableSet.of(testRange), SystemKeyspace.rawRangesToRangeSet(row.getSet("full_ranges", BytesType.instance), DatabaseDescriptor.getPartitioner()));
+        }
+        assertEquals(1, rowCount);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java b/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java
index 32fa4e4..fff567b 100644
--- a/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java
+++ b/test/unit/org/apache/cassandra/db/TableCQLHelperTest.java
@@ -26,6 +26,7 @@ import java.util.*;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.io.Files;
+import org.apache.cassandra.service.reads.NeverSpeculativeRetryPolicy;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -255,6 +256,7 @@ public class TableCQLHelperTest extends CQLTester
                .maxIndexInterval(7)
                .memtableFlushPeriod(8)
                .speculativeRetry(AlwaysSpeculativeRetryPolicy.INSTANCE)
+               .speculativeWriteThreshold(NeverSpeculativeRetryPolicy.INSTANCE)
                .extensions(ImmutableMap.of("ext1", ByteBuffer.wrap("val1".getBytes())))
                .recordColumnDrop(ColumnMetadata.regularColumn(keyspace, table, "reg1", AsciiType.instance),
                                  FBUtilities.timestampMicros());
@@ -272,6 +274,7 @@ public class TableCQLHelperTest extends CQLTester
         "\tAND max_index_interval = 7\n" +
         "\tAND memtable_flush_period_in_ms = 8\n" +
         "\tAND speculative_retry = 'ALWAYS'\n" +
+        "\tAND speculative_write_threshold = 'NEVER'\n" +
         "\tAND comment = 'comment'\n" +
         "\tAND caching = { 'keys': 'ALL', 'rows_per_partition': 'NONE' }\n" +
         "\tAND compaction = { 'max_threshold': '32', 'min_threshold': '4', 'sstable_size_in_mb': '1', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy' }\n" +

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/VerifyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/VerifyTest.java b/test/unit/org/apache/cassandra/db/VerifyTest.java
index 0632274..df2acb4 100644
--- a/test/unit/org/apache/cassandra/db/VerifyTest.java
+++ b/test/unit/org/apache/cassandra/db/VerifyTest.java
@@ -421,7 +421,7 @@ public class VerifyTest
 
         // make the sstable repaired:
         SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
-        sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, System.currentTimeMillis(), sstable.getSSTableMetadata().pendingRepair);
+        sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, System.currentTimeMillis(), sstable.getPendingRepair(), sstable.isTransient());
         sstable.reloadSSTableMetadata();
 
         // break the sstable:
@@ -487,7 +487,7 @@ public class VerifyTest
         fillCF(cfs, 2);
 
         SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
-        sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 1, sstable.getSSTableMetadata().pendingRepair);
+        sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, 1, sstable.getPendingRepair(), sstable.isTransient());
         sstable.reloadSSTableMetadata();
         cfs.getTracker().notifySSTableRepairedStatusChanged(Collections.singleton(sstable));
         assertTrue(sstable.isRepaired());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
index a320248..4d62894 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
@@ -109,17 +109,16 @@ public class AbstractPendingRepairTest extends AbstractRepairTest
         SSTableReader sstable = diff.iterator().next();
         if (orphan)
         {
-            Iterables.any(csm.getUnrepaired(), s -> s.getSSTables().contains(sstable));
-            csm.getUnrepaired().forEach(s -> s.removeSSTable(sstable));
+            csm.getUnrepairedUnsafe().allStrategies().forEach(acs -> acs.removeSSTable(sstable));
         }
         return sstable;
     }
 
-    protected static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair)
+    protected static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair, boolean isTransient)
     {
         try
         {
-            sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, repairedAt, pendingRepair);
+            sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, repairedAt, pendingRepair, isTransient);
             sstable.reloadSSTableMetadata();
         }
         catch (IOException e)
@@ -130,11 +129,11 @@ public class AbstractPendingRepairTest extends AbstractRepairTest
 
     protected static void mutateRepaired(SSTableReader sstable, long repairedAt)
     {
-        mutateRepaired(sstable, repairedAt, ActiveRepairService.NO_PENDING_REPAIR);
+        mutateRepaired(sstable, repairedAt, ActiveRepairService.NO_PENDING_REPAIR, false);
     }
 
-    protected static void mutateRepaired(SSTableReader sstable, UUID pendingRepair)
+    protected static void mutateRepaired(SSTableReader sstable, UUID pendingRepair, boolean isTransient)
     {
-        mutateRepaired(sstable, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair);
+        mutateRepaired(sstable, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair, isTransient);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 366c18e..f514ea6 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -27,6 +27,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.function.Predicate;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
@@ -39,6 +40,8 @@ import org.junit.Test;
 
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.schema.MockSchema;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
@@ -50,7 +53,6 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.KeyspaceParams;
@@ -58,6 +60,7 @@ import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.concurrent.Refs;
 import org.apache.cassandra.UpdateBuilder;
@@ -77,16 +80,21 @@ public class AntiCompactionTest
 {
     private static final String KEYSPACE1 = "AntiCompactionTest";
     private static final String CF = "AntiCompactionTest";
+    private static final Collection<Range<Token>> NO_RANGES = Collections.emptyList();
+
     private static TableMetadata metadata;
     private static ColumnFamilyStore cfs;
+    private static InetAddressAndPort local;
+
 
     @BeforeClass
-    public static void defineSchema() throws ConfigurationException
+    public static void defineSchema() throws Throwable
     {
         SchemaLoader.prepareServer();
         metadata = SchemaLoader.standardCFMD(KEYSPACE1, CF).build();
         SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), metadata);
         cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.id);
+        local = InetAddressAndPort.getByName("127.0.0.1");
     }
 
     @After
@@ -97,56 +105,86 @@ public class AntiCompactionTest
         store.truncateBlocking();
     }
 
-    private void registerParentRepairSession(UUID sessionID, Collection<Range<Token>> ranges, long repairedAt, UUID pendingRepair) throws IOException
+    private void registerParentRepairSession(UUID sessionID, Iterable<Range<Token>> ranges, long repairedAt, UUID pendingRepair) throws IOException
     {
         ActiveRepairService.instance.registerParentRepairSession(sessionID,
                                                                  InetAddressAndPort.getByName("10.0.0.1"),
-                                                                 Lists.newArrayList(cfs), ranges,
+                                                                 Lists.newArrayList(cfs), ImmutableSet.copyOf(ranges),
                                                                  pendingRepair != null || repairedAt != UNREPAIRED_SSTABLE,
                                                                  repairedAt, true, PreviewKind.NONE);
     }
 
-    private void antiCompactOne(long repairedAt, UUID pendingRepair) throws Exception
+    private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, Collection<Range<Token>> trans)
     {
-        assert repairedAt != UNREPAIRED_SSTABLE || pendingRepair != null;
+        RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(local);
+        for (Range<Token> range : full)
+            builder.add(new Replica(local, range, true));
 
-        ColumnFamilyStore store = prepareColumnFamilyStore();
-        Collection<SSTableReader> sstables = getUnrepairedSSTables(store);
-        assertEquals(store.getLiveSSTables().size(), sstables.size());
-        Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("4".getBytes()));
-        List<Range<Token>> ranges = Arrays.asList(range);
+        for (Range<Token> range : trans)
+            builder.add(new Replica(local, range, false));
+
+        return builder.build();
+    }
+
+    private static Collection<Range<Token>> range(int l, int r)
+    {
+        return Collections.singleton(new Range<>(new BytesToken(Integer.toString(l).getBytes()), new BytesToken(Integer.toString(r).getBytes())));
+    }
 
-        int repairedKeys = 0;
+    private static class SSTableStats
+    {
+        int numLiveSSTables = 0;
         int pendingKeys = 0;
-        int nonRepairedKeys = 0;
+        int transKeys = 0;
+        int unrepairedKeys = 0;
+    }
+
+    private SSTableStats antiCompactRanges(ColumnFamilyStore store, RangesAtEndpoint ranges) throws IOException
+    {
+        UUID sessionID = UUID.randomUUID();
+        Collection<SSTableReader> sstables = getUnrepairedSSTables(store);
         try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
              Refs<SSTableReader> refs = Refs.ref(sstables))
         {
             if (txn == null)
                 throw new IllegalStateException();
-            UUID parentRepairSession = pendingRepair == null ? UUID.randomUUID() : pendingRepair;
-            registerParentRepairSession(parentRepairSession, ranges, repairedAt, pendingRepair);
-            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, pendingRepair, parentRepairSession);
+            registerParentRepairSession(sessionID, ranges.ranges(), FBUtilities.nowInSeconds(), sessionID);
+            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, sessionID);
         }
 
-        assertEquals(2, store.getLiveSSTables().size());
+        SSTableStats stats = new SSTableStats();
+        stats.numLiveSSTables = store.getLiveSSTables().size();
+
+        Predicate<Token> fullContains = t -> Iterables.any(ranges.fullRanges(), r -> r.contains(t));
+        Predicate<Token> transContains = t -> Iterables.any(ranges.transientRanges(), r -> r.contains(t));
         for (SSTableReader sstable : store.getLiveSSTables())
         {
+            assertFalse(sstable.isRepaired());
+            assertEquals(sstable.isPendingRepair() ? sessionID : NO_PENDING_REPAIR, sstable.getPendingRepair());
             try (ISSTableScanner scanner = sstable.getScanner())
             {
                 while (scanner.hasNext())
                 {
                     UnfilteredRowIterator row = scanner.next();
-                    if (sstable.isRepaired() || sstable.isPendingRepair())
+                    Token token = row.partitionKey().getToken();
+                    if (sstable.isPendingRepair() && !sstable.isTransient())
                     {
-                        assertTrue(range.contains(row.partitionKey().getToken()));
-                        repairedKeys += sstable.isRepaired() ? 1 : 0;
-                        pendingKeys += sstable.isPendingRepair() ? 1 : 0;
+                        assertTrue(fullContains.test(token));
+                        assertFalse(transContains.test(token));
+                        stats.pendingKeys++;
+                    }
+                    else if (sstable.isPendingRepair() && sstable.isTransient())
+                    {
+
+                        assertTrue(transContains.test(token));
+                        assertFalse(fullContains.test(token));
+                        stats.transKeys++;
                     }
                     else
                     {
-                        assertFalse(range.contains(row.partitionKey().getToken()));
-                        nonRepairedKeys++;
+                        assertFalse(fullContains.test(token));
+                        assertFalse(transContains.test(token));
+                        stats.unrepairedKeys++;
                     }
                 }
             }
@@ -157,21 +195,40 @@ public class AntiCompactionTest
             assertEquals(1, sstable.selfRef().globalCount());
         }
         assertEquals(0, store.getTracker().getCompacting().size());
-        assertEquals(repairedKeys, repairedAt != UNREPAIRED_SSTABLE ? 4 : 0);
-        assertEquals(pendingKeys, pendingRepair != NO_PENDING_REPAIR ? 4 : 0);
-        assertEquals(nonRepairedKeys, 6);
+        return stats;
     }
 
     @Test
-    public void antiCompactOneRepairedAt() throws Exception
+    public void antiCompactOneFull() throws Exception
     {
-        antiCompactOne(1000, NO_PENDING_REPAIR);
+        ColumnFamilyStore store = prepareColumnFamilyStore();
+        SSTableStats stats = antiCompactRanges(store, atEndpoint(range(0, 4), NO_RANGES));
+        assertEquals(2, stats.numLiveSSTables);
+        assertEquals(stats.pendingKeys, 4);
+        assertEquals(stats.transKeys, 0);
+        assertEquals(stats.unrepairedKeys, 6);
+    }
+
+    @Test
+    public void antiCompactOneMixed() throws Exception
+    {
+        ColumnFamilyStore store = prepareColumnFamilyStore();
+        SSTableStats stats = antiCompactRanges(store, atEndpoint(range(0, 4), range(4, 8)));
+        assertEquals(3, stats.numLiveSSTables);
+        assertEquals(stats.pendingKeys, 4);
+        assertEquals(stats.transKeys, 4);
+        assertEquals(stats.unrepairedKeys, 2);
     }
 
     @Test
-    public void antiCompactOnePendingRepair() throws Exception
+    public void antiCompactOneTransOnly() throws Exception
     {
-        antiCompactOne(UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID());
+        ColumnFamilyStore store = prepareColumnFamilyStore();
+        SSTableStats stats = antiCompactRanges(store, atEndpoint(NO_RANGES, range(0, 4)));
+        assertEquals(2, stats.numLiveSSTables);
+        assertEquals(stats.pendingKeys, 0);
+        assertEquals(stats.transKeys, 4);
+        assertEquals(stats.unrepairedKeys, 6);
     }
 
     @Test
@@ -190,7 +247,7 @@ public class AntiCompactionTest
         try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
              Refs<SSTableReader> refs = Refs.ref(sstables))
         {
-            CompactionManager.instance.performAnticompaction(cfs, ranges, refs, txn, 12345, NO_PENDING_REPAIR, parentRepairSession);
+            CompactionManager.instance.performAnticompaction(cfs, atEndpoint(ranges, NO_RANGES), refs, txn, parentRepairSession);
         }
         long sum = 0;
         long rows = 0;
@@ -208,7 +265,7 @@ public class AntiCompactionTest
         File dir = cfs.getDirectories().getDirectoryForNewSSTables();
         Descriptor desc = cfs.newSSTableDescriptor(dir);
 
-        try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 0, 0, NO_PENDING_REPAIR, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS)))
+        try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 0, 0, NO_PENDING_REPAIR, false, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS)))
         {
             for (int i = 0; i < count; i++)
             {
@@ -240,7 +297,7 @@ public class AntiCompactionTest
     }
 
     @Test
-    public void antiCompactTen() throws InterruptedException, IOException
+    public void antiCompactTenFull() throws InterruptedException, IOException
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
@@ -250,56 +307,59 @@ public class AntiCompactionTest
         {
             generateSStable(store,Integer.toString(table));
         }
-        Collection<SSTableReader> sstables = getUnrepairedSSTables(store);
-        assertEquals(store.getLiveSSTables().size(), sstables.size());
+        SSTableStats stats = antiCompactRanges(store, atEndpoint(range(0, 4), NO_RANGES));
+        /*
+        Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time
+        so there will be no net change in the number of sstables
+         */
+        assertEquals(10, stats.numLiveSSTables);
+        assertEquals(stats.pendingKeys, 40);
+        assertEquals(stats.transKeys, 0);
+        assertEquals(stats.unrepairedKeys, 60);
+    }
 
-        Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("4".getBytes()));
-        List<Range<Token>> ranges = Arrays.asList(range);
+    @Test
+    public void antiCompactTenTrans() throws InterruptedException, IOException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+        store.disableAutoCompaction();
 
-        long repairedAt = 1000;
-        UUID parentRepairSession = UUID.randomUUID();
-        registerParentRepairSession(parentRepairSession, ranges, repairedAt, null);
-        try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
-             Refs<SSTableReader> refs = Refs.ref(sstables))
+        for (int table = 0; table < 10; table++)
         {
-            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, NO_PENDING_REPAIR, parentRepairSession);
+            generateSStable(store,Integer.toString(table));
         }
+        SSTableStats stats = antiCompactRanges(store, atEndpoint(NO_RANGES, range(0, 4)));
         /*
         Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time
         so there will be no net change in the number of sstables
          */
-        assertEquals(10, store.getLiveSSTables().size());
-        int repairedKeys = 0;
-        int nonRepairedKeys = 0;
-        for (SSTableReader sstable : store.getLiveSSTables())
+        assertEquals(10, stats.numLiveSSTables);
+        assertEquals(stats.pendingKeys, 0);
+        assertEquals(stats.transKeys, 40);
+        assertEquals(stats.unrepairedKeys, 60);
+    }
+
+    @Test
+    public void antiCompactTenMixed() throws InterruptedException, IOException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+        store.disableAutoCompaction();
+
+        for (int table = 0; table < 10; table++)
         {
-            try (ISSTableScanner scanner = sstable.getScanner())
-            {
-                while (scanner.hasNext())
-                {
-                    try (UnfilteredRowIterator row = scanner.next())
-                    {
-                        if (sstable.isRepaired())
-                        {
-                            assertTrue(range.contains(row.partitionKey().getToken()));
-                            assertEquals(repairedAt, sstable.getSSTableMetadata().repairedAt);
-                            repairedKeys++;
-                        }
-                        else
-                        {
-                            assertFalse(range.contains(row.partitionKey().getToken()));
-                            assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().repairedAt);
-                            nonRepairedKeys++;
-                        }
-                    }
-                }
-            }
+            generateSStable(store,Integer.toString(table));
         }
-        assertEquals(repairedKeys, 40);
-        assertEquals(nonRepairedKeys, 60);
+        SSTableStats stats = antiCompactRanges(store, atEndpoint(range(0, 4), range(4, 8)));
+        assertEquals(15, stats.numLiveSSTables);
+        assertEquals(stats.pendingKeys, 40);
+        assertEquals(stats.transKeys, 40);
+        assertEquals(stats.unrepairedKeys, 20);
     }
 
-    private void shouldMutate(long repairedAt, UUID pendingRepair) throws InterruptedException, IOException
+    @Test
+    public void shouldMutatePendingRepair() throws InterruptedException, IOException
     {
         ColumnFamilyStore store = prepareColumnFamilyStore();
         Collection<SSTableReader> sstables = getUnrepairedSSTables(store);
@@ -307,35 +367,23 @@ public class AntiCompactionTest
         // the sstables start at "0".getBytes() = 48, we need to include that first token, with "/".getBytes() = 47
         Range<Token> range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("9999".getBytes()));
         List<Range<Token>> ranges = Arrays.asList(range);
-        UUID parentRepairSession = pendingRepair == null ? UUID.randomUUID() : pendingRepair;
-        registerParentRepairSession(parentRepairSession, ranges, repairedAt, pendingRepair);
+        UUID pendingRepair = UUID.randomUUID();
+        registerParentRepairSession(pendingRepair, ranges, UNREPAIRED_SSTABLE, pendingRepair);
 
         try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
              Refs<SSTableReader> refs = Refs.ref(sstables))
         {
-            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt, pendingRepair, parentRepairSession);
+            CompactionManager.instance.performAnticompaction(store, atEndpoint(ranges, NO_RANGES), refs, txn, pendingRepair);
         }
 
         assertThat(store.getLiveSSTables().size(), is(1));
-        assertThat(Iterables.get(store.getLiveSSTables(), 0).isRepaired(), is(repairedAt != UNREPAIRED_SSTABLE));
-        assertThat(Iterables.get(store.getLiveSSTables(), 0).isPendingRepair(), is(pendingRepair != NO_PENDING_REPAIR));
+        assertThat(Iterables.get(store.getLiveSSTables(), 0).isRepaired(), is(false));
+        assertThat(Iterables.get(store.getLiveSSTables(), 0).isPendingRepair(), is(true));
         assertThat(Iterables.get(store.getLiveSSTables(), 0).selfRef().globalCount(), is(1));
         assertThat(store.getTracker().getCompacting().size(), is(0));
     }
 
     @Test
-    public void shouldMutateRepairedAt() throws InterruptedException, IOException
-    {
-        shouldMutate(1, NO_PENDING_REPAIR);
-    }
-
-    @Test
-    public void shouldMutatePendingRepair() throws InterruptedException, IOException
-    {
-        shouldMutate(UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID());
-    }
-
-    @Test
     public void shouldSkipAntiCompactionForNonIntersectingRange() throws InterruptedException, IOException
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
@@ -358,7 +406,7 @@ public class AntiCompactionTest
         try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
              Refs<SSTableReader> refs = Refs.ref(sstables))
         {
-            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, NO_PENDING_REPAIR, parentRepairSession);
+            CompactionManager.instance.performAnticompaction(store, atEndpoint(ranges, NO_RANGES), refs, txn, parentRepairSession);
         }
         catch (IllegalStateException e)
         {
@@ -428,7 +476,7 @@ public class AntiCompactionTest
             Assert.assertFalse(refs.isEmpty());
             try
             {
-                CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, missingRepairSession, missingRepairSession);
+                CompactionManager.instance.performAnticompaction(store, atEndpoint(ranges, NO_RANGES), refs, txn, missingRepairSession);
                 Assert.fail("expected RuntimeException");
             }
             catch (RuntimeException e)
@@ -484,8 +532,7 @@ public class AntiCompactionTest
 
         Range<Token> r = new Range<>(t(9), t(100)); // sstable is not intersecting and should not be included
 
-        Iterator<SSTableReader> sstableIterator = sstables.iterator();
-        CompactionManager.findSSTablesToAnticompact(sstableIterator, Collections.singletonList(r), UUID.randomUUID());
+        CompactionManager.validateSSTableBoundsForAnticompaction(UUID.randomUUID(), sstables, atEndpoint(Collections.singletonList(r), NO_RANGES));
     }
 
     @Test(expected = IllegalStateException.class)
@@ -500,8 +547,7 @@ public class AntiCompactionTest
 
         Range<Token> r = new Range<>(t(10), t(11)); // no sstable included, throw
 
-        Iterator<SSTableReader> sstableIterator = sstables.iterator();
-        CompactionManager.findSSTablesToAnticompact(sstableIterator, Collections.singletonList(r), UUID.randomUUID());
+        CompactionManager.validateSSTableBoundsForAnticompaction(UUID.randomUUID(), sstables, atEndpoint(Collections.singletonList(r), NO_RANGES));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
index c7f1ae8..267c2e4 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.db.compaction;
 
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
@@ -42,29 +41,34 @@ import org.apache.cassandra.utils.FBUtilities;
 public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingRepairTest
 {
 
-    private static boolean strategiesContain(Collection<AbstractCompactionStrategy> strategies, SSTableReader sstable)
+    private boolean transientContains(SSTableReader sstable)
     {
-        return Iterables.any(strategies, strategy -> strategy.getSSTables().contains(sstable));
-    }
-
-    private boolean pendingContains(UUID id, SSTableReader sstable)
-    {
-        return Iterables.any(csm.getPendingRepairManagers(), p -> p.get(id) != null && p.get(id).getSSTables().contains(sstable));
+        return csm.getTransientRepairsUnsafe().containsSSTable(sstable);
     }
 
     private boolean pendingContains(SSTableReader sstable)
     {
-        return Iterables.any(csm.getPendingRepairManagers(), p -> strategiesContain(p.getStrategies(), sstable));
+        return csm.getPendingRepairsUnsafe().containsSSTable(sstable);
     }
 
     private boolean repairedContains(SSTableReader sstable)
     {
-        return strategiesContain(csm.getRepaired(), sstable);
+        return csm.getRepairedUnsafe().containsSSTable(sstable);
     }
 
     private boolean unrepairedContains(SSTableReader sstable)
     {
-        return strategiesContain(csm.getUnrepaired(), sstable);
+        return csm.getUnrepairedUnsafe().containsSSTable(sstable);
+    }
+
+    private boolean hasPendingStrategiesFor(UUID sessionID)
+    {
+        return !Iterables.isEmpty(csm.getPendingRepairsUnsafe().getStrategiesFor(sessionID));
+    }
+
+    private boolean hasTransientStrategiesFor(UUID sessionID)
+    {
+        return !Iterables.isEmpty(csm.getTransientRepairsUnsafe().getStrategiesFor(sessionID));
     }
 
     /**
@@ -75,23 +79,25 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
     {
         UUID repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
-        Assert.assertTrue(csm.pendingRepairs().isEmpty());
+        Assert.assertTrue(Iterables.isEmpty(csm.getPendingRepairsUnsafe().allStrategies()));
 
         SSTableReader sstable = makeSSTable(true);
         Assert.assertFalse(sstable.isRepaired());
         Assert.assertFalse(sstable.isPendingRepair());
 
-        mutateRepaired(sstable, repairID);
+        mutateRepaired(sstable, repairID, false);
         Assert.assertFalse(sstable.isRepaired());
         Assert.assertTrue(sstable.isPendingRepair());
-        csm.getForPendingRepair(repairID).forEach(Assert::assertNull);
+        Assert.assertFalse(hasPendingStrategiesFor(repairID));
+        Assert.assertFalse(hasTransientStrategiesFor(repairID));
 
         // add the sstable
         csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
         Assert.assertFalse(repairedContains(sstable));
         Assert.assertFalse(unrepairedContains(sstable));
-        csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull);
-        Assert.assertTrue(pendingContains(repairID, sstable));
+        Assert.assertTrue(pendingContains(sstable));
+        Assert.assertTrue(hasPendingStrategiesFor(repairID));
+        Assert.assertFalse(hasTransientStrategiesFor(repairID));
     }
 
     @Test
@@ -101,16 +107,17 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
 
         SSTableReader sstable1 = makeSSTable(true);
-        mutateRepaired(sstable1, repairID);
+        mutateRepaired(sstable1, repairID, false);
 
         SSTableReader sstable2 = makeSSTable(true);
-        mutateRepaired(sstable2, repairID);
+        mutateRepaired(sstable2, repairID, false);
 
         Assert.assertFalse(repairedContains(sstable1));
         Assert.assertFalse(unrepairedContains(sstable1));
         Assert.assertFalse(repairedContains(sstable2));
         Assert.assertFalse(unrepairedContains(sstable2));
-        csm.getForPendingRepair(repairID).forEach(Assert::assertNull);
+        Assert.assertFalse(hasPendingStrategiesFor(repairID));
+        Assert.assertFalse(hasTransientStrategiesFor(repairID));
 
         // add only
         SSTableListChangedNotification notification;
@@ -119,13 +126,14 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
                                                           OperationType.COMPACTION);
         csm.handleNotification(notification, cfs.getTracker());
 
-        csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull);
         Assert.assertFalse(repairedContains(sstable1));
         Assert.assertFalse(unrepairedContains(sstable1));
-        Assert.assertTrue(pendingContains(repairID, sstable1));
+        Assert.assertTrue(pendingContains(sstable1));
         Assert.assertFalse(repairedContains(sstable2));
         Assert.assertFalse(unrepairedContains(sstable2));
-        Assert.assertFalse(pendingContains(repairID, sstable2));
+        Assert.assertFalse(pendingContains(sstable2));
+        Assert.assertTrue(hasPendingStrategiesFor(repairID));
+        Assert.assertFalse(hasTransientStrategiesFor(repairID));
 
         // remove and add
         notification = new SSTableListChangedNotification(Collections.singleton(sstable2),
@@ -135,10 +143,10 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
 
         Assert.assertFalse(repairedContains(sstable1));
         Assert.assertFalse(unrepairedContains(sstable1));
-        Assert.assertFalse(pendingContains(repairID, sstable1));
+        Assert.assertFalse(pendingContains(sstable1));
         Assert.assertFalse(repairedContains(sstable2));
         Assert.assertFalse(unrepairedContains(sstable2));
-        Assert.assertTrue(pendingContains(repairID, sstable2));
+        Assert.assertTrue(pendingContains(sstable2));
     }
 
     @Test
@@ -151,18 +159,20 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
         SSTableReader sstable = makeSSTable(false);
         Assert.assertTrue(unrepairedContains(sstable));
         Assert.assertFalse(repairedContains(sstable));
-        csm.getForPendingRepair(repairID).forEach(Assert::assertNull);
+        Assert.assertFalse(hasPendingStrategiesFor(repairID));
+        Assert.assertFalse(hasTransientStrategiesFor(repairID));
 
         SSTableRepairStatusChanged notification;
 
         // change to pending repaired
-        mutateRepaired(sstable, repairID);
+        mutateRepaired(sstable, repairID, false);
         notification = new SSTableRepairStatusChanged(Collections.singleton(sstable));
         csm.handleNotification(notification, cfs.getTracker());
         Assert.assertFalse(unrepairedContains(sstable));
         Assert.assertFalse(repairedContains(sstable));
-        csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull);
-        Assert.assertTrue(pendingContains(repairID, sstable));
+        Assert.assertTrue(hasPendingStrategiesFor(repairID));
+        Assert.assertFalse(hasTransientStrategiesFor(repairID));
+        Assert.assertTrue(pendingContains(sstable));
 
         // change to repaired
         mutateRepaired(sstable, System.currentTimeMillis());
@@ -170,7 +180,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
         csm.handleNotification(notification, cfs.getTracker());
         Assert.assertFalse(unrepairedContains(sstable));
         Assert.assertTrue(repairedContains(sstable));
-        Assert.assertFalse(pendingContains(repairID, sstable));
+        Assert.assertFalse(pendingContains(sstable));
     }
 
     @Test
@@ -180,14 +190,14 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
 
         SSTableReader sstable = makeSSTable(true);
-        mutateRepaired(sstable, repairID);
+        mutateRepaired(sstable, repairID, false);
         csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
-        Assert.assertTrue(pendingContains(repairID, sstable));
+        Assert.assertTrue(pendingContains(sstable));
 
         // delete sstable
         SSTableDeletingNotification notification = new SSTableDeletingNotification(sstable);
         csm.handleNotification(notification, cfs.getTracker());
-        Assert.assertFalse(pendingContains(repairID, sstable));
+        Assert.assertFalse(pendingContains(sstable));
         Assert.assertFalse(unrepairedContains(sstable));
         Assert.assertFalse(repairedContains(sstable));
     }
@@ -209,7 +219,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
         Assert.assertTrue(strategies.get(2).isEmpty());
 
         SSTableReader sstable = makeSSTable(true);
-        mutateRepaired(sstable, repairID);
+        mutateRepaired(sstable, repairID, false);
         csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
 
         strategies = csm.getStrategies();
@@ -227,11 +237,12 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
         UUID repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
         SSTableReader sstable = makeSSTable(true);
-        mutateRepaired(sstable, repairID);
+        mutateRepaired(sstable, repairID, false);
         csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
         LocalSessionAccessor.finalizeUnsafe(repairID);
-        csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull);
-        Assert.assertNotNull(pendingContains(repairID, sstable));
+        Assert.assertTrue(hasPendingStrategiesFor(repairID));
+        Assert.assertFalse(hasTransientStrategiesFor(repairID));
+        Assert.assertTrue(pendingContains(sstable));
         Assert.assertTrue(sstable.isPendingRepair());
         Assert.assertFalse(sstable.isRepaired());
 
@@ -245,7 +256,9 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
 
         Assert.assertTrue(repairedContains(sstable));
         Assert.assertFalse(unrepairedContains(sstable));
-        csm.getForPendingRepair(repairID).forEach(Assert::assertNull);
+        Assert.assertFalse(pendingContains(sstable));
+        Assert.assertFalse(hasPendingStrategiesFor(repairID));
+        Assert.assertFalse(hasTransientStrategiesFor(repairID));
 
         // sstable should have pendingRepair cleared, and repairedAt set correctly
         long expectedRepairedAt = ActiveRepairService.instance.getParentRepairSession(repairID).repairedAt;
@@ -264,12 +277,13 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
         UUID repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
         SSTableReader sstable = makeSSTable(true);
-        mutateRepaired(sstable, repairID);
+        mutateRepaired(sstable, repairID, false);
         csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
         LocalSessionAccessor.failUnsafe(repairID);
 
-        csm.getForPendingRepair(repairID).forEach(Assert::assertNotNull);
-        Assert.assertNotNull(pendingContains(repairID, sstable));
+        Assert.assertTrue(hasPendingStrategiesFor(repairID));
+        Assert.assertFalse(hasTransientStrategiesFor(repairID));
+        Assert.assertTrue(pendingContains(sstable));
         Assert.assertTrue(sstable.isPendingRepair());
         Assert.assertFalse(sstable.isRepaired());
 
@@ -283,11 +297,78 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
 
         Assert.assertFalse(repairedContains(sstable));
         Assert.assertTrue(unrepairedContains(sstable));
-        csm.getForPendingRepair(repairID).forEach(Assert::assertNull);
+        Assert.assertFalse(hasPendingStrategiesFor(repairID));
+        Assert.assertFalse(hasTransientStrategiesFor(repairID));
 
         // sstable should have pendingRepair cleared, and repairedAt set correctly
         Assert.assertFalse(sstable.isPendingRepair());
         Assert.assertFalse(sstable.isRepaired());
         Assert.assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().repairedAt);
     }
+
+    @Test
+    public void finalizedSessionTransientCleanup()
+    {
+        Assert.assertTrue(cfs.getLiveSSTables().isEmpty());
+        UUID repairID = registerSession(cfs, true, true);
+        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
+        SSTableReader sstable = makeSSTable(true);
+        mutateRepaired(sstable, repairID, true);
+        csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
+        LocalSessionAccessor.finalizeUnsafe(repairID);
+
+        Assert.assertFalse(hasPendingStrategiesFor(repairID));
+        Assert.assertTrue(hasTransientStrategiesFor(repairID));
+        Assert.assertTrue(transientContains(sstable));
+        Assert.assertFalse(pendingContains(sstable));
+        Assert.assertFalse(repairedContains(sstable));
+        Assert.assertFalse(unrepairedContains(sstable));
+
+        cfs.getCompactionStrategyManager().enable(); // enable compaction to fetch next background task
+        AbstractCompactionTask compactionTask = csm.getNextBackgroundTask(FBUtilities.nowInSeconds());
+        Assert.assertNotNull(compactionTask);
+        Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass());
+
+        // run the compaction
+        compactionTask.execute(null);
+
+        Assert.assertTrue(cfs.getLiveSSTables().isEmpty());
+        Assert.assertFalse(hasPendingStrategiesFor(repairID));
+        Assert.assertFalse(hasTransientStrategiesFor(repairID));
+    }
+
+    @Test
+    public void failedSessionTransientCleanup()
+    {
+        Assert.assertTrue(cfs.getLiveSSTables().isEmpty());
+        UUID repairID = registerSession(cfs, true, true);
+        LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
+        SSTableReader sstable = makeSSTable(true);
+        mutateRepaired(sstable, repairID, true);
+        csm.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable), null), cfs.getTracker());
+        LocalSessionAccessor.failUnsafe(repairID);
+
+        Assert.assertFalse(hasPendingStrategiesFor(repairID));
+        Assert.assertTrue(hasTransientStrategiesFor(repairID));
+        Assert.assertTrue(transientContains(sstable));
+        Assert.assertFalse(pendingContains(sstable));
+        Assert.assertFalse(repairedContains(sstable));
+        Assert.assertFalse(unrepairedContains(sstable));
+
+        cfs.getCompactionStrategyManager().enable(); // enable compaction to fetch next background task
+        AbstractCompactionTask compactionTask = csm.getNextBackgroundTask(FBUtilities.nowInSeconds());
+        Assert.assertNotNull(compactionTask);
+        Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass());
+
+        // run the compaction
+        compactionTask.execute(null);
+
+        Assert.assertFalse(cfs.getLiveSSTables().isEmpty());
+        Assert.assertFalse(hasPendingStrategiesFor(repairID));
+        Assert.assertFalse(hasTransientStrategiesFor(repairID));
+        Assert.assertFalse(transientContains(sstable));
+        Assert.assertFalse(pendingContains(sstable));
+        Assert.assertFalse(repairedContains(sstable));
+        Assert.assertTrue(unrepairedContains(sstable));
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
index eeaaf5b..73e6852 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
@@ -129,12 +129,12 @@ public class CompactionStrategyManagerTest
             if (i % 3 == 0)
             {
                 //make 1 third of sstables repaired
-                cfs.getCompactionStrategyManager().mutateRepaired(newSSTables, System.currentTimeMillis(), null);
+                cfs.getCompactionStrategyManager().mutateRepaired(newSSTables, System.currentTimeMillis(), null, false);
             }
             else if (i % 3 == 1)
             {
                 //make 1 third of sstables pending repair
-                cfs.getCompactionStrategyManager().mutateRepaired(newSSTables, 0, UUIDGen.getTimeUUID());
+                cfs.getCompactionStrategyManager().mutateRepaired(newSSTables, 0, UUIDGen.getTimeUUID(), false);
             }
             previousSSTables = currentSSTables;
         }
@@ -272,19 +272,19 @@ public class CompactionStrategyManagerTest
         DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(false);
     }
 
-    private static void assertHolderExclusivity(boolean isRepaired, boolean isPendingRepair, Class<? extends AbstractStrategyHolder> expectedType)
+    private static void assertHolderExclusivity(boolean isRepaired, boolean isPendingRepair, boolean isTransient, Class<? extends AbstractStrategyHolder> expectedType)
     {
         ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
         CompactionStrategyManager csm = cfs.getCompactionStrategyManager();
 
-        AbstractStrategyHolder holder = csm.getHolder(isRepaired, isPendingRepair);
+        AbstractStrategyHolder holder = csm.getHolder(isRepaired, isPendingRepair, isTransient);
         assertNotNull(holder);
         assertSame(expectedType, holder.getClass());
 
         int matches = 0;
         for (AbstractStrategyHolder other : csm.getHolders())
         {
-            if (other.managesRepairedGroup(isRepaired, isPendingRepair))
+            if (other.managesRepairedGroup(isRepaired, isPendingRepair, isTransient))
             {
                 assertSame("holder assignment should be mutually exclusive", holder, other);
                 matches++;
@@ -293,13 +293,13 @@ public class CompactionStrategyManagerTest
         assertEquals(1, matches);
     }
 
-    private static void assertInvalieHolderConfig(boolean isRepaired, boolean isPendingRepair)
+    private static void assertInvalieHolderConfig(boolean isRepaired, boolean isPendingRepair, boolean isTransient)
     {
         ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
         CompactionStrategyManager csm = cfs.getCompactionStrategyManager();
         try
         {
-            csm.getHolder(isRepaired, isPendingRepair);
+            csm.getHolder(isRepaired, isPendingRepair, isTransient);
             fail("Expected IllegalArgumentException");
         }
         catch (IllegalArgumentException e)
@@ -315,10 +315,14 @@ public class CompactionStrategyManagerTest
     @Test
     public void testMutualExclusiveHolderClassification() throws Exception
     {
-        assertHolderExclusivity(false, false, CompactionStrategyHolder.class);
-        assertHolderExclusivity(true, false, CompactionStrategyHolder.class);
-        assertHolderExclusivity(false, true, PendingRepairHolder.class);
-        assertInvalieHolderConfig(true, true);
+        assertHolderExclusivity(false, false, false, CompactionStrategyHolder.class);
+        assertHolderExclusivity(true, false, false, CompactionStrategyHolder.class);
+        assertHolderExclusivity(false, true, false, PendingRepairHolder.class);
+        assertHolderExclusivity(false, true, true, PendingRepairHolder.class);
+        assertInvalieHolderConfig(true, true, false);
+        assertInvalieHolderConfig(true, true, true);
+        assertInvalieHolderConfig(false, false, true);
+        assertInvalieHolderConfig(true, false, true);
     }
 
     PartitionPosition forKey(int key)
@@ -337,20 +341,23 @@ public class CompactionStrategyManagerTest
         ColumnFamilyStore cfs = createJBODMockCFS(numDir);
         Keyspace.open(cfs.keyspace.getName()).getColumnFamilyStore(cfs.name).disableAutoCompaction();
         assertTrue(cfs.getLiveSSTables().isEmpty());
-        List<SSTableReader> unrepaired = new ArrayList<>();
+        List<SSTableReader> transientRepairs = new ArrayList<>();
         List<SSTableReader> pendingRepair = new ArrayList<>();
+        List<SSTableReader> unrepaired = new ArrayList<>();
         List<SSTableReader> repaired = new ArrayList<>();
 
         for (int i = 0; i < numDir; i++)
         {
             int key = 100 * i;
-            unrepaired.add(createSSTableWithKey(cfs.keyspace.getName(), cfs.name, key++));
+            transientRepairs.add(createSSTableWithKey(cfs.keyspace.getName(), cfs.name, key++));
             pendingRepair.add(createSSTableWithKey(cfs.keyspace.getName(), cfs.name, key++));
+            unrepaired.add(createSSTableWithKey(cfs.keyspace.getName(), cfs.name, key++));
             repaired.add(createSSTableWithKey(cfs.keyspace.getName(), cfs.name, key++));
         }
 
-        cfs.getCompactionStrategyManager().mutateRepaired(pendingRepair, 0, UUID.randomUUID());
-        cfs.getCompactionStrategyManager().mutateRepaired(repaired, 1000, null);
+        cfs.getCompactionStrategyManager().mutateRepaired(transientRepairs, 0, UUID.randomUUID(), true);
+        cfs.getCompactionStrategyManager().mutateRepaired(pendingRepair, 0, UUID.randomUUID(), false);
+        cfs.getCompactionStrategyManager().mutateRepaired(repaired, 1000, null, false);
 
         DiskBoundaries boundaries = new DiskBoundaries(cfs.getDirectories().getWriteableLocations(),
                                                        Lists.newArrayList(forKey(100), forKey(200), forKey(300)),
@@ -358,7 +365,7 @@ public class CompactionStrategyManagerTest
 
         CompactionStrategyManager csm = new CompactionStrategyManager(cfs, () -> boundaries, true);
 
-        List<GroupedSSTableContainer> grouped = csm.groupSSTables(Iterables.concat(repaired, pendingRepair, unrepaired));
+        List<GroupedSSTableContainer> grouped = csm.groupSSTables(Iterables.concat( transientRepairs, pendingRepair, repaired, unrepaired));
 
         for (int x=0; x<grouped.size(); x++)
         {
@@ -372,7 +379,16 @@ public class CompactionStrategyManagerTest
                 if (sstable.isRepaired())
                     expected = repaired.get(y);
                 else if (sstable.isPendingRepair())
-                    expected = pendingRepair.get(y);
+                {
+                    if (sstable.isTransient())
+                    {
+                        expected = transientRepairs.get(y);
+                    }
+                    else
+                    {
+                        expected = pendingRepair.get(y);
+                    }
+                }
                 else
                     expected = unrepaired.get(y);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
index 599fc74..5370f33 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
@@ -96,9 +96,9 @@ public class CompactionTaskTest
         Assert.assertEquals(Transactional.AbstractTransactional.State.ABORTED, txn.state());
     }
 
-    private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair) throws IOException
+    private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair, boolean isTransient) throws IOException
     {
-        sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, repairedAt, pendingRepair);
+        sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, repairedAt, pendingRepair, isTransient);
         sstable.reloadSSTableMetadata();
     }
 
@@ -127,9 +127,9 @@ public class CompactionTaskTest
         SSTableReader pending1 = sstables.get(2);
         SSTableReader pending2 = sstables.get(3);
 
-        mutateRepaired(repaired, FBUtilities.nowInSeconds(), ActiveRepairService.NO_PENDING_REPAIR);
-        mutateRepaired(pending1, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID());
-        mutateRepaired(pending2, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID());
+        mutateRepaired(repaired, FBUtilities.nowInSeconds(), ActiveRepairService.NO_PENDING_REPAIR, false);
+        mutateRepaired(pending1, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID(), false);
+        mutateRepaired(pending2, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID(), false);
 
         LifecycleTransaction txn = null;
         List<SSTableReader> toCompact = new ArrayList<>(sstables);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index c91d2fe..857fa32 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -411,7 +411,7 @@ public class CompactionsCQLTest extends CQLTester
             cfs.forceBlockingFlush();
         }
         assertEquals(50, cfs.getLiveSSTables().size());
-        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().get(0);
+        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepairedUnsafe().first();
         AbstractCompactionTask act = lcs.getNextBackgroundTask(0);
         // we should be compacting all 50 sstables:
         assertEquals(50, act.transaction.originals().size());
@@ -445,7 +445,7 @@ public class CompactionsCQLTest extends CQLTester
 
         // mark the L1 sstable as compacting to make sure we trigger STCS in L0:
         LifecycleTransaction txn = cfs.getTracker().tryModify(l1sstable, OperationType.COMPACTION);
-        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().get(0);
+        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepairedUnsafe().first();
         AbstractCompactionTask act = lcs.getNextBackgroundTask(0);
         // note that max_threshold is 60 (more than the amount of L0 sstables), but MAX_COMPACTING_L0 is 32, which means we will trigger STCS with at most max_threshold sstables
         assertEquals(50, act.transaction.originals().size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 9ebe326..23e88fe 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -364,7 +364,7 @@ public class LeveledCompactionStrategyTest
         SSTableReader sstable1 = unrepaired.manifest.generations[2].get(0);
         SSTableReader sstable2 = unrepaired.manifest.generations[1].get(0);
 
-        sstable1.descriptor.getMetadataSerializer().mutateRepaired(sstable1.descriptor, System.currentTimeMillis(), null);
+        sstable1.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable1.descriptor, System.currentTimeMillis(), null, false);
         sstable1.reloadSSTableMetadata();
         assertTrue(sstable1.isRepaired());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
index 2b88c9c..d83e063 100644
--- a/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
@@ -45,7 +45,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
         UUID repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
         SSTableReader sstable = makeSSTable(true);
-        mutateRepaired(sstable, repairID);
+        mutateRepaired(sstable, repairID, false);
         prm.addSSTable(sstable);
         Assert.assertNotNull(prm.get(repairID));
 
@@ -63,7 +63,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
         UUID repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
         SSTableReader sstable = makeSSTable(true);
-        mutateRepaired(sstable, repairID);
+        mutateRepaired(sstable, repairID, false);
         prm.addSSTable(sstable);
         Assert.assertNotNull(prm.get(repairID));
         LocalSessionAccessor.finalizeUnsafe(repairID);
@@ -82,7 +82,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
         UUID repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
         SSTableReader sstable = makeSSTable(true);
-        mutateRepaired(sstable, repairID);
+        mutateRepaired(sstable, repairID, false);
         prm.addSSTable(sstable);
         Assert.assertNotNull(prm.get(repairID));
         LocalSessionAccessor.failUnsafe(repairID);
@@ -94,7 +94,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
     public void needsCleanupNoSession()
     {
         UUID fakeID = UUIDGen.getTimeUUID();
-        PendingRepairManager prm = new PendingRepairManager(cfs, null);
+        PendingRepairManager prm = new PendingRepairManager(cfs, null, false);
         Assert.assertTrue(prm.canCleanup(fakeID));
     }
 
@@ -106,7 +106,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
         UUID repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
         SSTableReader sstable = makeSSTable(true);
-        mutateRepaired(sstable, repairID);
+        mutateRepaired(sstable, repairID, false);
         prm.addSSTable(sstable);
         Assert.assertNotNull(prm.get(repairID));
 
@@ -122,7 +122,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
         UUID repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
         SSTableReader sstable = makeSSTable(true);
-        mutateRepaired(sstable, repairID);
+        mutateRepaired(sstable, repairID, false);
         prm.addSSTable(sstable);
         Assert.assertNotNull(prm.get(repairID));
         Assert.assertNotNull(prm.get(repairID));
@@ -140,13 +140,13 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
         UUID repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
         SSTableReader sstable = makeSSTable(true);
-        mutateRepaired(sstable, repairID);
+        mutateRepaired(sstable, repairID, false);
         prm.addSSTable(sstable);
 
         repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
         sstable = makeSSTable(true);
-        mutateRepaired(sstable, repairID);
+        mutateRepaired(sstable, repairID, false);
         prm.addSSTable(sstable);
         LocalSessionAccessor.finalizeUnsafe(repairID);
 
@@ -184,7 +184,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
 
         SSTableReader sstable = makeSSTable(true);
-        mutateRepaired(sstable, repairID);
+        mutateRepaired(sstable, repairID, false);
         prm.addSSTable(sstable);
         Assert.assertNotNull(prm.get(repairID));
         Assert.assertNotNull(prm.get(repairID));
@@ -202,7 +202,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
         UUID repairID = registerSession(cfs, true, true);
         LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS);
         SSTableReader sstable = makeSSTable(true);
-        mutateRepaired(sstable, repairID);
+        mutateRepaired(sstable, repairID, false);
         prm.addSSTable(sstable);
         Assert.assertNotNull(prm.get(repairID));
         Assert.assertNotNull(prm.get(repairID));
@@ -225,7 +225,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
         PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
         UUID repairId = registerSession(cfs, true, true);
         SSTableReader sstable = makeSSTable(true);
-        mutateRepaired(sstable, repairId);
+        mutateRepaired(sstable, repairId, false);
         prm.addSSTable(sstable);
         List<AbstractCompactionTask> tasks = csm.getUserDefinedTasks(Collections.singleton(sstable), 100);
         try
@@ -247,8 +247,8 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
         SSTableReader sstable = makeSSTable(true);
         SSTableReader sstable2 = makeSSTable(true);
 
-        mutateRepaired(sstable, repairId);
-        mutateRepaired(sstable2, repairId2);
+        mutateRepaired(sstable, repairId, false);
+        mutateRepaired(sstable2, repairId2, false);
         prm.addSSTable(sstable);
         prm.addSSTable(sstable2);
         List<AbstractCompactionTask> tasks = csm.getUserDefinedTasks(Lists.newArrayList(sstable, sstable2), 100);
@@ -296,7 +296,7 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
 
         Assert.assertFalse(prm.hasDataForSession(repairID));
         SSTableReader sstable = makeSSTable(true);
-        mutateRepaired(sstable, repairID);
+        mutateRepaired(sstable, repairID, false);
         prm.addSSTable(sstable);
         Assert.assertTrue(prm.hasDataForSession(repairID));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java b/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java
index 6428ab7..1292b7e 100644
--- a/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java
@@ -56,7 +56,7 @@ public class SingleSSTableLCSTaskTest extends CQLTester
         assertEquals(1, cfs.getLiveSSTables().size());
         cfs.getLiveSSTables().forEach(s -> assertEquals(2, s.getSSTableLevel()));
         // make sure compaction strategy is notified:
-        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().iterator().next();
+        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepairedUnsafe().first();
         for (int i = 0; i < lcs.manifest.getLevelCount(); i++)
         {
             if (i == 2)
@@ -98,7 +98,7 @@ public class SingleSSTableLCSTaskTest extends CQLTester
                 cfs.forceBlockingFlush();
         }
         // now we have a bunch of data in L0, first compaction will be a normal one, containing all sstables:
-        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().get(0);
+        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepairedUnsafe().first();
         AbstractCompactionTask act = lcs.getNextBackgroundTask(0);
         act.execute(null);
 
@@ -148,7 +148,7 @@ public class SingleSSTableLCSTaskTest extends CQLTester
         assertEquals(1, cfs.getLiveSSTables().size());
         for (SSTableReader sst : cfs.getLiveSSTables())
             assertEquals(0, sst.getSSTableMetadata().sstableLevel);
-        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().iterator().next();
+        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepairedUnsafe().first();
         assertEquals(1, lcs.getLevelSize(0));
         assertTrue(cfs.getTracker().getCompacting().isEmpty());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
index 5694e86..e5ff138 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
@@ -1183,7 +1183,7 @@ public class LogTransactionTest extends AbstractTransactionalTest
 
         SerializationHeader header = SerializationHeader.make(cfs.metadata(), Collections.emptyList());
         StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata().comparator)
-                                                 .finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 0.01f, -1, null, header)
+                                                 .finalizeMetadata(cfs.metadata().partitioner.getClass().getCanonicalName(), 0.01f, -1, null, false, header)
                                                  .get(MetadataType.STATS);
         SSTableReader reader = SSTableReader.internalOpen(descriptor,
                                                           components,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
index 5cda2ad..b7b7d4a 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
@@ -164,6 +164,7 @@ public class RealTransactionsTest extends SchemaLoader
                                                            0,
                                                            0,
                                                            null,
+                                                           false,
                                                            0,
                                                            SerializationHeader.make(cfs.metadata(), txn.originals()),
                                                            cfs.indexManager.listIndexes(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java b/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java
index 76ebfd8..3b29cc5 100644
--- a/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java
+++ b/test/unit/org/apache/cassandra/db/repair/CompactionManagerGetSSTablesForValidationTest.java
@@ -119,11 +119,11 @@ public class CompactionManagerGetSSTablesForValidationTest
         Iterator<SSTableReader> iter = cfs.getLiveSSTables().iterator();
 
         repaired = iter.next();
-        repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, System.currentTimeMillis(), null);
+        repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor, System.currentTimeMillis(), null, false);
         repaired.reloadSSTableMetadata();
 
         pendingRepair = iter.next();
-        pendingRepair.descriptor.getMetadataSerializer().mutateRepaired(pendingRepair.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, sessionID);
+        pendingRepair.descriptor.getMetadataSerializer().mutateRepairMetadata(pendingRepair.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, sessionID, false);
         pendingRepair.reloadSSTableMetadata();
 
         unrepaired = iter.next();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org