You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/05/22 10:46:59 UTC

[5/6] cassandra git commit: Extend Transactional API to sstable lifecycle management

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index d79b835..004e893 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -54,14 +54,10 @@ import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.OnDiskAtom;
-import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionInfo.Holder;
 import org.apache.cassandra.db.index.SecondaryIndexBuilder;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -82,12 +78,14 @@ import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.Refs;
 
+import static java.util.Collections.singleton;
+
 /**
  * <p>
  * A singleton which manages a private executor of ongoing compactions.
  * </p>
  * Scheduling for compaction is accomplished by swapping sstables to be compacted into
- * a set via DataTracker. New scheduling attempts will ignore currently compacting
+ * a set via Tracker. New scheduling attempts will ignore currently compacting
  * sstables.
  */
 public class CompactionManager implements CompactionManagerMBean
@@ -195,7 +193,7 @@ public class CompactionManager implements CompactionManagerMBean
     public boolean isCompacting(Iterable<ColumnFamilyStore> cfses)
     {
         for (ColumnFamilyStore cfs : cfses)
-            if (!cfs.getDataTracker().getCompacting().isEmpty())
+            if (!cfs.getTracker().getCompacting().isEmpty())
                 return true;
         return false;
     }
@@ -245,22 +243,22 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
-    private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation) throws ExecutionException, InterruptedException
+    private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation, OperationType operationType) throws ExecutionException, InterruptedException
     {
-        Iterable<SSTableReader> compactingSSTables = cfs.markAllCompacting();
-        if (compactingSSTables == null)
-        {
-            logger.info("Aborting operation on {}.{} after failing to interrupt other compaction operations", cfs.keyspace.getName(), cfs.name);
-            return AllSSTableOpStatus.ABORTED;
-        }
-        if (Iterables.isEmpty(compactingSSTables))
+        try (LifecycleTransaction compacting = cfs.markAllCompacting(operationType);)
         {
-            logger.info("No sstables for {}.{}", cfs.keyspace.getName(), cfs.name);
-            return AllSSTableOpStatus.SUCCESSFUL;
-        }
-        try
-        {
-            Iterable<SSTableReader> sstables = operation.filterSSTables(compactingSSTables);
+            if (compacting == null)
+            {
+                logger.info("Aborting operation on {}.{} after failing to interrupt other compaction operations", cfs.keyspace.getName(), cfs.name);
+                return AllSSTableOpStatus.ABORTED;
+            }
+            if (compacting.originals().isEmpty())
+            {
+                logger.info("No sstables for {}.{}", cfs.keyspace.getName(), cfs.name);
+                return AllSSTableOpStatus.SUCCESSFUL;
+            }
+
+            Iterable<SSTableReader> sstables = operation.filterSSTables(compacting.originals());
             List<Future<Object>> futures = new ArrayList<>();
 
             for (final SSTableReader sstable : sstables)
@@ -271,31 +269,30 @@ public class CompactionManager implements CompactionManagerMBean
                     return AllSSTableOpStatus.ABORTED;
                 }
 
+                final LifecycleTransaction txn = compacting.split(singleton(sstable));
                 futures.add(executor.submit(new Callable<Object>()
                 {
                     @Override
                     public Object call() throws Exception
                     {
-                        operation.execute(sstable);
+                        operation.execute(txn);
                         return this;
                     }
                 }));
             }
 
+            assert compacting.originals().isEmpty();
+
             for (Future<Object> f : futures)
                 f.get();
+            return AllSSTableOpStatus.SUCCESSFUL;
         }
-        finally
-        {
-            cfs.getDataTracker().unmarkCompacting(compactingSSTables);
-        }
-        return AllSSTableOpStatus.SUCCESSFUL;
     }
 
     private static interface OneSSTableOperation
     {
         Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input);
-        void execute(SSTableReader input) throws IOException;
+        void execute(LifecycleTransaction input) throws IOException;
     }
 
     public enum AllSSTableOpStatus { ABORTED(1), SUCCESSFUL(0);
@@ -318,11 +315,11 @@ public class CompactionManager implements CompactionManagerMBean
             }
 
             @Override
-            public void execute(SSTableReader input) throws IOException
+            public void execute(LifecycleTransaction input) throws IOException
             {
                 scrubOne(cfs, input, skipCorrupted, checkData);
             }
-        });
+        }, OperationType.SCRUB);
     }
 
     public AllSSTableOpStatus performVerify(final ColumnFamilyStore cfs, final boolean extendedVerify) throws InterruptedException, ExecutionException
@@ -337,11 +334,11 @@ public class CompactionManager implements CompactionManagerMBean
             }
 
             @Override
-            public void execute(SSTableReader input) throws IOException
+            public void execute(LifecycleTransaction input) throws IOException
             {
-                verifyOne(cfs, input, extendedVerify);
+                verifyOne(cfs, input.onlyOne(), extendedVerify);
             }
-        });
+        }, OperationType.VERIFY);
     }
 
     public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, final boolean excludeCurrentVersion) throws InterruptedException, ExecutionException
@@ -362,14 +359,14 @@ public class CompactionManager implements CompactionManagerMBean
             }
 
             @Override
-            public void execute(SSTableReader input) throws IOException
+            public void execute(LifecycleTransaction txn) throws IOException
             {
-                AbstractCompactionTask task = cfs.getCompactionStrategy().getCompactionTask(Collections.singleton(input), NO_GC, Long.MAX_VALUE);
+                AbstractCompactionTask task = cfs.getCompactionStrategy().getCompactionTask(txn, NO_GC, Long.MAX_VALUE);
                 task.setUserDefined(true);
                 task.setCompactionType(OperationType.UPGRADE_SSTABLES);
                 task.execute(metrics);
             }
-        });
+        }, OperationType.UPGRADE_SSTABLES);
     }
 
     public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
@@ -395,12 +392,12 @@ public class CompactionManager implements CompactionManagerMBean
             }
 
             @Override
-            public void execute(SSTableReader input) throws IOException
+            public void execute(LifecycleTransaction txn) throws IOException
             {
                 CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, ranges);
-                doCleanupOne(cfStore, input, cleanupStrategy, ranges, hasIndexes);
+                doCleanupOne(cfStore, txn, cleanupStrategy, ranges, hasIndexes);
             }
-        });
+        }, OperationType.CLEANUP);
     }
 
     public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs,
@@ -412,19 +409,19 @@ public class CompactionManager implements CompactionManagerMBean
             @Override
             public void runMayThrow() throws Exception
             {
-                boolean success = false;
-                while (!success)
+                LifecycleTransaction modifier = null;
+                while (modifier == null)
                 {
-                    for (SSTableReader compactingSSTable : cfs.getDataTracker().getCompacting())
+                    for (SSTableReader compactingSSTable : cfs.getTracker().getCompacting())
                         sstables.releaseIfHolds(compactingSSTable);
                     Set<SSTableReader> compactedSSTables = new HashSet<>();
                     for (SSTableReader sstable : sstables)
                         if (sstable.isMarkedCompacted())
                             compactedSSTables.add(sstable);
                     sstables.release(compactedSSTables);
-                    success = sstables.isEmpty() || cfs.getDataTracker().markCompacting(sstables);
+                    modifier = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
                 }
-                performAnticompaction(cfs, ranges, sstables, repairedAt);
+                performAnticompaction(cfs, ranges, sstables, modifier, repairedAt);
             }
         };
         if (executor.isShutdown())
@@ -452,6 +449,7 @@ public class CompactionManager implements CompactionManagerMBean
     public void performAnticompaction(ColumnFamilyStore cfs,
                                       Collection<Range<Token>> ranges,
                                       Refs<SSTableReader> validatedForRepair,
+                                      LifecycleTransaction txn,
                                       long repairedAt) throws InterruptedException, IOException
     {
         logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getSSTables().size());
@@ -490,16 +488,18 @@ public class CompactionManager implements CompactionManagerMBean
                     }
                 }
             }
-            cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
-            cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses));
+            cfs.getTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
+            txn.cancel(Sets.union(nonAnticompacting, mutatedRepairStatuses));
             validatedForRepair.release(Sets.union(nonAnticompacting, mutatedRepairStatuses));
+            assert txn.originals().equals(sstables);
             if (!sstables.isEmpty())
-                doAntiCompaction(cfs, ranges, sstables, repairedAt);
+                doAntiCompaction(cfs, ranges, txn, repairedAt);
+            txn.finish();
         }
         finally
         {
             validatedForRepair.release();
-            cfs.getDataTracker().unmarkCompacting(sstables);
+            txn.close();
         }
 
         logger.info("Completed anticompaction successfully");
@@ -657,9 +657,9 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
-    private void scrubOne(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean checkData) throws IOException
+    private void scrubOne(ColumnFamilyStore cfs, LifecycleTransaction modifier, boolean skipCorrupted, boolean checkData) throws IOException
     {
-        Scrubber scrubber = new Scrubber(cfs, sstable, skipCorrupted, false, checkData);
+        Scrubber scrubber = new Scrubber(cfs, modifier, skipCorrupted, false, checkData);
 
         CompactionInfo.Holder scrubInfo = scrubber.getScrubInfo();
         metrics.beginCompaction(scrubInfo);
@@ -750,15 +750,16 @@ public class CompactionManager implements CompactionManagerMBean
      *
      * @throws IOException
      */
-    private void doCleanupOne(final ColumnFamilyStore cfs, SSTableReader sstable, CleanupStrategy cleanupStrategy, Collection<Range<Token>> ranges, boolean hasIndexes) throws IOException
+    private void doCleanupOne(final ColumnFamilyStore cfs, LifecycleTransaction txn, CleanupStrategy cleanupStrategy, Collection<Range<Token>> ranges, boolean hasIndexes) throws IOException
     {
         assert !cfs.isIndex();
 
-        Set<SSTableReader> sstableSet = Collections.singleton(sstable);
+        SSTableReader sstable = txn.onlyOne();
 
         if (!hasIndexes && !new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges))
         {
-            cfs.getDataTracker().markCompactedSSTablesReplaced(sstableSet, Collections.<SSTableReader>emptyList(), OperationType.CLEANUP);
+            txn.obsoleteOriginals();
+            txn.finish();
             return;
         }
         if (!needsCleanup(sstable, ranges))
@@ -772,13 +773,13 @@ public class CompactionManager implements CompactionManagerMBean
         long totalkeysWritten = 0;
 
         int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(),
-                                               (int) (SSTableReader.getApproximateKeyCount(sstableSet)));
+                                               (int) (SSTableReader.getApproximateKeyCount(txn.originals())));
         if (logger.isDebugEnabled())
             logger.debug("Expected bloom filter size : {}", expectedBloomFilterSize);
 
         logger.info("Cleaning up {}", sstable);
 
-        File compactionFileLocation = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableSet, OperationType.CLEANUP));
+        File compactionFileLocation = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(txn.originals(), OperationType.CLEANUP));
         if (compactionFileLocation == null)
             throw new IOException("disk full");
 
@@ -786,10 +787,9 @@ public class CompactionManager implements CompactionManagerMBean
         CleanupInfo ci = new CleanupInfo(sstable, scanner);
 
         metrics.beginCompaction(ci);
-        Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable);
         List<SSTableReader> finished;
-        try (SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, false);
-             CompactionController controller = new CompactionController(cfs, sstableSet, getDefaultGcBefore(cfs)))
+        try (SSTableRewriter writer = new SSTableRewriter(cfs, txn, sstable.maxDataAge, false);
+             CompactionController controller = new CompactionController(cfs, txn.originals(), getDefaultGcBefore(cfs)))
         {
             writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
 
@@ -811,7 +811,6 @@ public class CompactionManager implements CompactionManagerMBean
             cfs.indexManager.flushIndexesBlocking();
 
             finished = writer.finish();
-            cfs.getDataTracker().markCompactedSSTablesReplaced(oldSSTable, finished, OperationType.CLEANUP);
         }
         finally
         {
@@ -970,11 +969,11 @@ public class CompactionManager implements CompactionManagerMBean
             }
         }
         return SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(compactionFileLocation)),
-                                 (long)expectedBloomFilterSize,
-                                 repairedAt,
-                                 cfs.metadata,
-                                 cfs.partitioner,
-                                 new MetadataCollector(sstables, cfs.metadata.comparator, minLevel));
+                                    (long) expectedBloomFilterSize,
+                                    repairedAt,
+                                    cfs.metadata,
+                                    cfs.partitioner,
+                                    new MetadataCollector(sstables, cfs.metadata.comparator, minLevel));
     }
 
 
@@ -1057,7 +1056,7 @@ public class CompactionManager implements CompactionManagerMBean
             long numPartitions = 0;
             for (SSTableReader sstable : sstables)
             {
-                numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range));
+                numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
             }
             // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
             int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
@@ -1119,37 +1118,39 @@ public class CompactionManager implements CompactionManagerMBean
      * will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted
      * and subsequently deleted.
      * @param cfs
-     * @param repairedSSTables
+     * @param repaired a transaction over the repaired sstables to anticompacy
      * @param ranges Repaired ranges to be placed into one of the new sstables. The repaired table will be tracked via
      * the {@link org.apache.cassandra.io.sstable.metadata.StatsMetadata#repairedAt} field.
      */
-    private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges,
-                                                       Collection<SSTableReader> repairedSSTables, long repairedAt)
+    private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, LifecycleTransaction repaired, long repairedAt)
     {
-        logger.info("Performing anticompaction on {} sstables", repairedSSTables.size());
+        logger.info("Performing anticompaction on {} sstables", repaired.originals().size());
 
         //Group SSTables
-        Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(repairedSSTables);
+        Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(repaired.originals());
         // iterate over sstables to check if the repaired / unrepaired ranges intersect them.
         int antiCompactedSSTableCount = 0;
         for (Collection<SSTableReader> sstableGroup : groupedSSTables)
         {
-            int antiCompacted = antiCompactGroup(cfs, ranges, sstableGroup, repairedAt);
-            antiCompactedSSTableCount += antiCompacted;
+            try (LifecycleTransaction txn = repaired.split(sstableGroup))
+            {
+                int antiCompacted = antiCompactGroup(cfs, ranges, txn, repairedAt);
+                antiCompactedSSTableCount += antiCompacted;
+            }
         }
 
         String format = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";
-        logger.info(format, repairedSSTables.size(), antiCompactedSSTableCount);
+        logger.info(format, repaired.originals().size(), antiCompactedSSTableCount);
     }
 
     private int antiCompactGroup(ColumnFamilyStore cfs, Collection<Range<Token>> ranges,
-                             Collection<SSTableReader> anticompactionGroup, long repairedAt)
+                             LifecycleTransaction anticompactionGroup, long repairedAt)
     {
         long groupMaxDataAge = -1;
 
         // check that compaction hasn't stolen any sstables used in previous repair sessions
         // if we need to skip the anticompaction, it will be carried out by the next repair
-        for (Iterator<SSTableReader> i = anticompactionGroup.iterator(); i.hasNext();)
+        for (Iterator<SSTableReader> i = anticompactionGroup.originals().iterator(); i.hasNext();)
         {
             SSTableReader sstable = i.next();
             if (!new File(sstable.getFilename()).exists())
@@ -1162,26 +1163,25 @@ public class CompactionManager implements CompactionManagerMBean
                 groupMaxDataAge = sstable.maxDataAge;
         }
 
-     
-        if (anticompactionGroup.size() == 0)
+        if (anticompactionGroup.originals().size() == 0)
         {
             logger.info("No valid anticompactions for this group, All sstables were compacted and are no longer available");
             return 0;
         }
 
         logger.info("Anticompacting {}", anticompactionGroup);
-        Set<SSTableReader> sstableAsSet = new HashSet<>(anticompactionGroup);
+        Set<SSTableReader> sstableAsSet = anticompactionGroup.originals();
 
         File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
         long repairedKeyCount = 0;
         long unrepairedKeyCount = 0;
         AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
-        try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, false);
-             SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, false);
-             AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup);
+        try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false);
+             SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false);
+             AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup.originals());
              CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs)))
         {
-            int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(anticompactionGroup)));
+            int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(sstableAsSet)));
 
             repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
             unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet));
@@ -1212,12 +1212,18 @@ public class CompactionManager implements CompactionManagerMBean
             {
                 metrics.finishCompaction(ci);
             }
-            // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
-            // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
+
             List<SSTableReader> anticompactedSSTables = new ArrayList<>();
-            anticompactedSSTables.addAll(repairedSSTableWriter.setRepairedAt(repairedAt).finish());
-            anticompactedSSTables.addAll(unRepairedSSTableWriter.setRepairedAt(ActiveRepairService.UNREPAIRED_SSTABLE).finish());
-            cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION);
+            // since both writers are operating over the same Transaction, we cannot use the convenience Transactional.finish() method,
+            // as on the second finish() we would prepareToCommit() on a Transaction that has already been committed, which is forbidden by the API
+            // (since it indicates misuse). We call permitRedundantTransitions so that calls that transition to a state already occupied are permitted.
+            anticompactionGroup.permitRedundantTransitions();
+            repairedSSTableWriter.setRepairedAt(repairedAt).prepareToCommit();
+            unRepairedSSTableWriter.prepareToCommit();
+            anticompactedSSTables.addAll(repairedSSTableWriter.finished());
+            anticompactedSSTables.addAll(unRepairedSSTableWriter.finished());
+            repairedSSTableWriter.commit();
+            unRepairedSSTableWriter.commit();
 
             logger.debug("Repaired {} keys out of {} for {}/{} in {}", repairedKeyCount,
                                                                        repairedKeyCount + unrepairedKeyCount,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 34f57c1..e593ec0 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -17,8 +17,6 @@
  */
 package org.apache.cassandra.db.compaction;
 
-import java.io.File;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -44,6 +42,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -55,9 +54,9 @@ public class CompactionTask extends AbstractCompactionTask
     protected static long totalBytesCompacted = 0;
     private CompactionExecutorStatsCollector collector;
 
-    public CompactionTask(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables, int gcBefore, boolean offline)
+    public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int gcBefore, boolean offline)
     {
-        super(cfs, Sets.newHashSet(sstables));
+        super(cfs, txn);
         this.gcBefore = gcBefore;
         this.offline = offline;
     }
@@ -71,23 +70,20 @@ public class CompactionTask extends AbstractCompactionTask
     {
         this.collector = collector;
         run();
-        return sstables.size();
+        return transaction.originals().size();
     }
 
     public boolean reduceScopeForLimitedSpace()
     {
-        if (partialCompactionsAcceptable() && sstables.size() > 1)
+        if (partialCompactionsAcceptable() && transaction.originals().size() > 1)
         {
             // Try again w/o the largest one.
-            logger.warn("insufficient space to compact all requested files {}", StringUtils.join(sstables, ", "));
+            logger.warn("insufficient space to compact all requested files {}", StringUtils.join(transaction.originals(), ", "));
             // Note that we have removed files that are still marked as compacting.
             // This suboptimal but ok since the caller will unmark all the sstables at the end.
-            SSTableReader removedSSTable = cfs.getMaxSizeFile(sstables);
-            if (sstables.remove(removedSSTable))
-            {
-                cfs.getDataTracker().unmarkCompacting(Arrays.asList(removedSSTable));
-                return true;
-            }
+            SSTableReader removedSSTable = cfs.getMaxSizeFile(transaction.originals());
+            transaction.cancel(removedSSTable);
+            return true;
         }
         return false;
     }
@@ -101,9 +97,9 @@ public class CompactionTask extends AbstractCompactionTask
     {
         // The collection of sstables passed may be empty (but not null); even if
         // it is not empty, it may compact down to nothing if all rows are deleted.
-        assert sstables != null;
+        assert transaction != null;
 
-        if (sstables.size() == 0)
+        if (transaction.originals().isEmpty())
             return;
 
         // Note that the current compaction strategy, is not necessarily the one this task was created under.
@@ -115,12 +111,12 @@ public class CompactionTask extends AbstractCompactionTask
 
         // note that we need to do a rough estimate early if we can fit the compaction on disk - this is pessimistic, but
         // since we might remove sstables from the compaction in checkAvailableDiskSpace it needs to be done here
-        long expectedWriteSize = cfs.getExpectedCompactedFileSize(sstables, compactionType);
+        long expectedWriteSize = cfs.getExpectedCompactedFileSize(transaction.originals(), compactionType);
         long earlySSTableEstimate = Math.max(1, expectedWriteSize / strategy.getMaxSSTableBytes());
         checkAvailableDiskSpace(earlySSTableEstimate, expectedWriteSize);
 
         // sanity check: all sstables must belong to the same cfs
-        assert !Iterables.any(sstables, new Predicate<SSTableReader>()
+        assert !Iterables.any(transaction.originals(), new Predicate<SSTableReader>()
         {
             @Override
             public boolean apply(SSTableReader sstable)
@@ -129,13 +125,13 @@ public class CompactionTask extends AbstractCompactionTask
             }
         });
 
-        UUID taskId = SystemKeyspace.startCompaction(cfs, sstables);
+        UUID taskId = SystemKeyspace.startCompaction(cfs, transaction.originals());
 
         // new sstables from flush can be added during a compaction, but only the compaction can remove them,
         // so in our single-threaded compaction world this is a valid way of determining if we're compacting
         // all the sstables (that existed when we started)
         StringBuilder ssTableLoggerMsg = new StringBuilder("[");
-        for (SSTableReader sstr : sstables)
+        for (SSTableReader sstr : transaction.originals())
         {
             ssTableLoggerMsg.append(String.format("%s:level=%d, ", sstr.getFilename(), sstr.getSSTableLevel()));
         }
@@ -148,11 +144,11 @@ public class CompactionTask extends AbstractCompactionTask
         long totalKeysWritten = 0;
 
         long estimatedKeys = 0;
-        try (CompactionController controller = getCompactionController(sstables))
+        try (CompactionController controller = getCompactionController(transaction.originals()))
         {
-            Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
+            Set<SSTableReader> actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables());
 
-            SSTableFormat.Type sstableFormat = getFormatType(sstables);
+            SSTableFormat.Type sstableFormat = getFormatType(transaction.originals());
 
             List<SSTableReader> newSStables;
             AbstractCompactionIterable ci;
@@ -171,7 +167,7 @@ public class CompactionTask extends AbstractCompactionTask
                 if (!controller.cfs.getCompactionStrategy().isActive)
                     throw new CompactionInterruptedException(ci.getCompactionInfo());
 
-                try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, sstables, actuallyCompact))
+                try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact))
                 {
                     estimatedKeys = writer.estimatedKeys();
                     while (iter.hasNext())
@@ -205,13 +201,9 @@ public class CompactionTask extends AbstractCompactionTask
                 }
             }
 
-            Collection<SSTableReader> oldSStables = this.sstables;
-            if (!offline)
-                cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
-
             // log a bunch of statistics about the result and save to system table compaction_history
             long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-            long startsize = SSTableReader.getTotalBytes(oldSStables);
+            long startsize = SSTableReader.getTotalBytes(transaction.originals());
             long endsize = SSTableReader.getTotalBytes(newSStables);
             double ratio = (double) endsize / (double) startsize;
 
@@ -223,7 +215,7 @@ public class CompactionTask extends AbstractCompactionTask
             long totalSourceRows = 0;
             String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), ci, startsize, endsize);
             logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d.  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
-                                      taskIdLoggerMsg, oldSStables.size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary));
+                                      taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary));
             logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
             logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten));
 
@@ -236,9 +228,9 @@ public class CompactionTask extends AbstractCompactionTask
     }
 
     @Override
-    public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables)
+    public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction transaction, Set<SSTableReader> nonExpiredSSTables)
     {
-        return new DefaultCompactionWriter(cfs, allSSTables, nonExpiredSSTables, offline, compactionType);
+        return new DefaultCompactionWriter(cfs, transaction, nonExpiredSSTables, offline, compactionType);
 
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 6385671..18d5f7b 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cql3.statements.CFPropDefs;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.utils.Pair;
 
@@ -67,8 +68,9 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
             if (latestBucket.isEmpty())
                 return null;
 
-            if (cfs.getDataTracker().markCompacting(latestBucket))
-                return new CompactionTask(cfs, latestBucket, gcBefore, false);
+            LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
+            if (modifier != null)
+                return new CompactionTask(cfs, modifier, gcBefore, false);
         }
     }
 
@@ -366,11 +368,11 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
     @Override
     public synchronized Collection<AbstractCompactionTask> getMaximalTask(int gcBefore, boolean splitOutput)
     {
-        Iterable<SSTableReader> sstables = cfs.markAllCompacting();
-        if (sstables == null)
+        LifecycleTransaction modifier = cfs.markAllCompacting(OperationType.COMPACTION);
+        if (modifier == null)
             return null;
 
-        return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, sstables, gcBefore, false));
+        return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, modifier, gcBefore, false));
     }
 
     @Override
@@ -378,13 +380,14 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
     {
         assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
 
-        if (!cfs.getDataTracker().markCompacting(sstables))
+        LifecycleTransaction modifier = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+        if (modifier == null)
         {
             logger.debug("Unable to mark {} for compaction; probably a background compaction got to it first.  You can disable background compactions temporarily if this is a problem", sstables);
             return null;
         }
 
-        return new CompactionTask(cfs, sstables, gcBefore, false).setUserDefined(true);
+        return new CompactionTask(cfs, modifier, gcBefore, false).setUserDefined(true);
     }
 
     public int getEstimatedRemainingTasks()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 6b82ad3..c434d31 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -115,9 +116,10 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
                 op = OperationType.COMPACTION;
             }
 
-            if (cfs.getDataTracker().markCompacting(candidate.sstables))
+            LifecycleTransaction txn = cfs.getTracker().tryModify(candidate.sstables, OperationType.COMPACTION);
+            if (txn != null)
             {
-                LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, candidate.sstables, candidate.level, gcBefore, candidate.maxSSTableBytes, false);
+                LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, txn, candidate.level, gcBefore, candidate.maxSSTableBytes, false);
                 newTask.setCompactionType(op);
                 return newTask;
             }
@@ -131,9 +133,10 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
         Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables);
         if (Iterables.isEmpty(sstables))
             return null;
-        if (!cfs.getDataTracker().markCompacting(filteredSSTables))
+        LifecycleTransaction txn = cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION);
+        if (txn == null)
             return null;
-        return Arrays.<AbstractCompactionTask>asList(new LeveledCompactionTask(cfs, filteredSSTables, 0, gcBefore, getMaxSSTableBytes(), true));
+        return Arrays.<AbstractCompactionTask>asList(new LeveledCompactionTask(cfs, txn, 0, gcBefore, getMaxSSTableBytes(), true));
 
     }
 
@@ -144,19 +147,19 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
     }
 
     @Override
-    public AbstractCompactionTask getCompactionTask(Collection<SSTableReader> sstables, int gcBefore, long maxSSTableBytes)
+    public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
     {
-        assert sstables.size() > 0;
+        assert txn.originals().size() > 0;
         int level = -1;
         // if all sstables are in the same level, we can set that level:
-        for (SSTableReader sstable : sstables)
+        for (SSTableReader sstable : txn.originals())
         {
             if (level == -1)
                 level = sstable.getSSTableLevel();
             if (level != sstable.getSSTableLevel())
                 level = 0;
         }
-        return new LeveledCompactionTask(cfs, sstables, level, gcBefore, maxSSTableBytes, false);
+        return new LeveledCompactionTask(cfs, txn, level, gcBefore, maxSSTableBytes, false);
     }
 
     /**
@@ -226,7 +229,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
         {
             for (Integer level : byLevel.keySet())
             {
-                // level can be -1 when sstables are added to DataTracker but not to LeveledManifest
+                // level can be -1 when sstables are added to Tracker but not to LeveledManifest
                 // since we don't know which level those sstable belong yet, we simply do the same as L0 sstables.
                 if (level <= 0)
                 {
@@ -402,7 +405,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
             if (sstables.isEmpty())
                 continue;
 
-            Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting();
+            Set<SSTableReader> compacting = cfs.getTracker().getCompacting();
             for (SSTableReader sstable : sstables)
             {
                 if (sstable.getEstimatedDroppableTombstoneRatio(gcBefore) <= tombstoneThreshold)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
index ce9dfaf..1c3b686 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
 import org.apache.cassandra.db.compaction.writers.MajorLeveledCompactionWriter;
 import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 
 public class LeveledCompactionTask extends CompactionTask
 {
@@ -31,20 +32,20 @@ public class LeveledCompactionTask extends CompactionTask
     private final long maxSSTableBytes;
     private final boolean majorCompaction;
 
-    public LeveledCompactionTask(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables, int level, final int gcBefore, long maxSSTableBytes, boolean majorCompaction)
+    public LeveledCompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int level, int gcBefore, long maxSSTableBytes, boolean majorCompaction)
     {
-        super(cfs, sstables, gcBefore, false);
+        super(cfs, txn, gcBefore, false);
         this.level = level;
         this.maxSSTableBytes = maxSSTableBytes;
         this.majorCompaction = majorCompaction;
     }
 
     @Override
-    public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables)
+    public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
     {
         if (majorCompaction)
-            return new MajorLeveledCompactionWriter(cfs, sstables, nonExpiredSSTables, maxSSTableBytes, false, compactionType);
-        return new MaxSSTableSizeWriter(cfs, sstables, nonExpiredSSTables, maxSSTableBytes, getLevel(), false, compactionType);
+            return new MajorLeveledCompactionWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, false, compactionType);
+        return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, getLevel(), false, compactionType);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index daff131..0d0928f 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -23,7 +23,6 @@ import java.util.*;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -316,7 +315,7 @@ public class LeveledManifest
                 continue; // mostly this just avoids polluting the debug log with zero scores
             // we want to calculate score excluding compacting ones
             Set<SSTableReader> sstablesInLevel = Sets.newHashSet(sstables);
-            Set<SSTableReader> remaining = Sets.difference(sstablesInLevel, cfs.getDataTracker().getCompacting());
+            Set<SSTableReader> remaining = Sets.difference(sstablesInLevel, cfs.getTracker().getCompacting());
             double score = (double) SSTableReader.getTotalBytes(remaining) / (double)maxBytesForLevel(i, maxSSTableSizeInBytes);
             logger.debug("Compaction score for level {} is {}", i, score);
 
@@ -361,7 +360,7 @@ public class LeveledManifest
 
     private List<SSTableReader> getSSTablesForSTCS(Collection<SSTableReader> sstables)
     {
-        Iterable<SSTableReader> candidates = cfs.getDataTracker().getUncompactingSSTables(sstables);
+        Iterable<SSTableReader> candidates = cfs.getTracker().getUncompacting(sstables);
         List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(AbstractCompactionStrategy.filterSuspectSSTables(candidates));
         List<List<SSTableReader>> buckets = SizeTieredCompactionStrategy.getBuckets(pairs,
                                                                                     options.bucketHigh,
@@ -415,7 +414,7 @@ public class LeveledManifest
                     }
                     if (min == null || max == null || min.equals(max)) // single partition sstables - we cannot include a high level sstable.
                         return candidates;
-                    Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting();
+                    Set<SSTableReader> compacting = cfs.getTracker().getCompacting();
                     Range<RowPosition> boundaries = new Range<>(min, max);
                     for (SSTableReader sstable : getLevel(i))
                     {
@@ -542,7 +541,7 @@ public class LeveledManifest
         assert !getLevel(level).isEmpty();
         logger.debug("Choosing candidates for L{}", level);
 
-        final Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting();
+        final Set<SSTableReader> compacting = cfs.getTracker().getCompacting();
 
         if (level == 0)
         {
@@ -650,7 +649,7 @@ public class LeveledManifest
     {
         Set<SSTableReader> sstables = new HashSet<>();
         Set<SSTableReader> levelSSTables = new HashSet<>(getLevel(level));
-        for (SSTableReader sstable : cfs.getDataTracker().getCompacting())
+        for (SSTableReader sstable : cfs.getTracker().getCompacting())
         {
             if (levelSSTables.contains(sstable))
                 sstables.add(sstable);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
index 8d7b0e9..e9a4f05 100644
--- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@ -23,6 +23,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
 import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 
 public class SSTableSplitter {
 
@@ -30,9 +31,9 @@ public class SSTableSplitter {
 
     private CompactionInfo.Holder info;
 
-    public SSTableSplitter(ColumnFamilyStore cfs, SSTableReader sstable, int sstableSizeInMB)
+    public SSTableSplitter(ColumnFamilyStore cfs, LifecycleTransaction transaction, int sstableSizeInMB)
     {
-        this.task = new SplittingCompactionTask(cfs, sstable, sstableSizeInMB);
+        this.task = new SplittingCompactionTask(cfs, transaction, sstableSizeInMB);
     }
 
     public void split()
@@ -57,9 +58,9 @@ public class SSTableSplitter {
     {
         private final int sstableSizeInMB;
 
-        public SplittingCompactionTask(ColumnFamilyStore cfs, SSTableReader sstable, int sstableSizeInMB)
+        public SplittingCompactionTask(ColumnFamilyStore cfs, LifecycleTransaction transaction, int sstableSizeInMB)
         {
-            super(cfs, Collections.singletonList(sstable), CompactionManager.NO_GC, true);
+            super(cfs, transaction, CompactionManager.NO_GC, true);
             this.sstableSizeInMB = sstableSizeInMB;
 
             if (sstableSizeInMB <= 0)
@@ -73,9 +74,9 @@ public class SSTableSplitter {
         }
 
         @Override
-        public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables)
+        public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
         {
-            return new MaxSSTableSizeWriter(cfs, sstables, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true, compactionType);
+            return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true, compactionType);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 1e014ed..b7c149c 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -23,9 +23,9 @@ import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
-import com.google.common.collect.Sets;
 
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
@@ -41,6 +41,7 @@ public class Scrubber implements Closeable
 {
     private final ColumnFamilyStore cfs;
     private final SSTableReader sstable;
+    private final LifecycleTransaction transaction;
     private final File destination;
     private final boolean skipCorrupted;
 
@@ -80,15 +81,16 @@ public class Scrubber implements Closeable
     };
     private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator);
 
-    public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean isOffline, boolean checkData) throws IOException
+    public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, boolean isOffline, boolean checkData) throws IOException
     {
-        this(cfs, sstable, skipCorrupted, new OutputHandler.LogOutput(), isOffline, checkData);
+        this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), isOffline, checkData);
     }
 
-    public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, OutputHandler outputHandler, boolean isOffline, boolean checkData) throws IOException
+    public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, OutputHandler outputHandler, boolean isOffline, boolean checkData) throws IOException
     {
         this.cfs = cfs;
-        this.sstable = sstable;
+        this.transaction = transaction;
+        this.sstable = transaction.onlyOne();
         this.outputHandler = outputHandler;
         this.skipCorrupted = skipCorrupted;
         this.isOffline = isOffline;
@@ -127,9 +129,7 @@ public class Scrubber implements Closeable
     public void scrub()
     {
         outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
-        Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable);
-
-        try (SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, isOffline);)
+        try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, sstable.maxDataAge, isOffline);)
         {
             nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
             {
@@ -278,8 +278,7 @@ public class Scrubber implements Closeable
                         inOrderWriter.append(row.key, row.cf);
                     newInOrderSstable = inOrderWriter.finish(-1, sstable.maxDataAge, true);
                 }
-                if (!isOffline)
-                    cfs.getDataTracker().addSSTables(Collections.singleton(newInOrderSstable));
+                transaction.update(newInOrderSstable, false);
                 outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable));
             }
 
@@ -287,8 +286,6 @@ public class Scrubber implements Closeable
             List<SSTableReader> finished = writer.setRepairedAt(badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt).finish();
             if (!finished.isEmpty())
                 newSstable = finished.get(0);
-            if (!isOffline)
-                cfs.getDataTracker().markCompactedSSTablesReplaced(oldSSTable, finished, OperationType.SCRUB);
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 722536c..94c3daf 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -21,22 +21,19 @@ import java.util.*;
 import java.util.Map.Entry;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
 import org.apache.cassandra.db.compaction.writers.SplittingSizeTieredCompactionWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import com.google.common.primitives.Longs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cql3.statements.CFPropDefs;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.ColumnNameHelper;
 import org.apache.cassandra.utils.Pair;
 
 public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
@@ -190,8 +187,9 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
             if (hottestBucket.isEmpty())
                 return null;
 
-            if (cfs.getDataTracker().markCompacting(hottestBucket))
-                return new CompactionTask(cfs, hottestBucket, gcBefore, false);
+            LifecycleTransaction transaction = cfs.getTracker().tryModify(hottestBucket, OperationType.COMPACTION);
+            if (transaction != null)
+                return new CompactionTask(cfs, transaction, gcBefore, false);
         }
     }
 
@@ -200,24 +198,26 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables);
         if (Iterables.isEmpty(filteredSSTables))
             return null;
-        if (!cfs.getDataTracker().markCompacting(ImmutableList.copyOf(filteredSSTables)))
+        LifecycleTransaction txn = cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION);
+        if (txn == null)
             return null;
         if (splitOutput)
-            return Arrays.<AbstractCompactionTask>asList(new SplittingCompactionTask(cfs, filteredSSTables, gcBefore, false));
-        return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, filteredSSTables, gcBefore, false));
+            return Arrays.<AbstractCompactionTask>asList(new SplittingCompactionTask(cfs, txn, gcBefore, false));
+        return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, txn, gcBefore, false));
     }
 
     public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore)
     {
         assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
 
-        if (!cfs.getDataTracker().markCompacting(sstables))
+        LifecycleTransaction transaction = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+        if (transaction == null)
         {
             logger.debug("Unable to mark {} for compaction; probably a background compaction got to it first.  You can disable background compactions temporarily if this is a problem", sstables);
             return null;
         }
 
-        return new CompactionTask(cfs, sstables, gcBefore, false).setUserDefined(true);
+        return new CompactionTask(cfs, transaction, gcBefore, false).setUserDefined(true);
     }
 
     public int getEstimatedRemainingTasks()
@@ -338,15 +338,15 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
 
     private static class SplittingCompactionTask extends CompactionTask
     {
-        public SplittingCompactionTask(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables, int gcBefore, boolean offline)
+        public SplittingCompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int gcBefore, boolean offline)
         {
-            super(cfs, sstables, gcBefore, offline);
+            super(cfs, txn, gcBefore, offline);
         }
 
         @Override
-        public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables)
+        public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
         {
-            return new SplittingSizeTieredCompactionWriter(cfs, allSSTables, nonExpiredSSTables, compactionType);
+            return new SplittingSizeTieredCompactionWriter(cfs, txn, nonExpiredSSTables, compactionType);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 5bb1530..6556a71 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -20,12 +20,10 @@ package org.apache.cassandra.db.compaction;
 import java.io.File;
 import java.util.*;
 
-import com.google.common.base.Throwables;
-import com.google.common.collect.Sets;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
@@ -37,6 +35,7 @@ public class Upgrader
 {
     private final ColumnFamilyStore cfs;
     private final SSTableReader sstable;
+    private final LifecycleTransaction transaction;
     private final File directory;
 
     private final OperationType compactionType = OperationType.UPGRADE_SSTABLES;
@@ -46,10 +45,11 @@ public class Upgrader
 
     private final OutputHandler outputHandler;
 
-    public Upgrader(ColumnFamilyStore cfs, SSTableReader sstable, OutputHandler outputHandler)
+    public Upgrader(ColumnFamilyStore cfs, LifecycleTransaction txn, OutputHandler outputHandler)
     {
         this.cfs = cfs;
-        this.sstable = sstable;
+        this.transaction = txn;
+        this.sstable = txn.onlyOne();
         this.outputHandler = outputHandler;
 
         this.directory = new File(sstable.getFilename()).getParentFile();
@@ -81,10 +81,9 @@ public class Upgrader
     public void upgrade()
     {
         outputHandler.output("Upgrading " + sstable);
-        Set<SSTableReader> toUpgrade = Sets.newHashSet(sstable);
 
-        try (SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(toUpgrade), true);
-             AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(toUpgrade))
+        try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, CompactionTask.getMaxDataAge(transaction.originals()), true);
+             AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(transaction.originals()))
         {
             Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID()).iterator();
             writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
index 5345d8d..c511bcd 100644
--- a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
@@ -50,7 +50,7 @@ public final class WrappingCompactionStrategy extends AbstractCompactionStrategy
     {
         super(cfs, cfs.metadata.compactionStrategyOptions);
         reloadCompactionStrategy(cfs.metadata);
-        cfs.getDataTracker().subscribe(this);
+        cfs.getTracker().subscribe(this);
         logger.debug("{} subscribed to the data tracker.", this);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index fe43186..20c96d6 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.db.compaction.CompactionTask;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.SSTableRewriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.concurrent.Transactional;
@@ -43,14 +44,14 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
     protected final long minRepairedAt;
     protected final SSTableRewriter sstableWriter;
 
-    public CompactionAwareWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, boolean offline)
+    public CompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline)
     {
         this.cfs = cfs;
         this.nonExpiredSSTables = nonExpiredSSTables;
         this.estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
         this.maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
         this.minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
-        this.sstableWriter = new SSTableRewriter(cfs, allSSTables, maxAge, offline);
+        this.sstableWriter = new SSTableRewriter(cfs, txn, maxAge, offline);
     }
 
     /**
@@ -67,12 +68,6 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
     }
 
     @Override
-    protected Throwable doCleanup(Throwable accumulate)
-    {
-        return accumulate;
-    }
-
-    @Override
     protected Throwable doCommit(Throwable accumulate)
     {
         return sstableWriter.commit(accumulate);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index 3589b54..0b31061 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db.compaction.writers;
 
 
 import java.io.File;
-import java.util.List;
 import java.util.Set;
 
 import org.slf4j.Logger;
@@ -28,14 +27,12 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableRewriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 
-import static org.apache.cassandra.utils.Throwables.maybeFail;
-
 
 /**
  * The default compaction writer - creates one output file in L0
@@ -44,9 +41,9 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
 {
     protected static final Logger logger = LoggerFactory.getLogger(DefaultCompactionWriter.class);
 
-    public DefaultCompactionWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, boolean offline, OperationType compactionType)
+    public DefaultCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, OperationType compactionType)
     {
-        super(cfs, allSSTables, nonExpiredSSTables, offline);
+        super(cfs, txn, nonExpiredSSTables, offline);
         logger.debug("Expected bloom filter size : {}", estimatedTotalKeys);
         long expectedWriteSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);
         File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
@@ -55,7 +52,7 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
                                                     minRepairedAt,
                                                     cfs.metadata,
                                                     cfs.partitioner,
-                                                    new MetadataCollector(allSSTables, cfs.metadata.comparator, 0));
+                                                    new MetadataCollector(txn.originals(), cfs.metadata.comparator, 0));
         sstableWriter.switchWriter(writer);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index d48140e..014b4af 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -18,8 +18,6 @@
 package org.apache.cassandra.db.compaction.writers;
 
 import java.io.File;
-import java.util.Collections;
-import java.util.List;
 import java.util.Set;
 
 import org.slf4j.Logger;
@@ -28,11 +26,10 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.db.compaction.CompactionTask;
 import org.apache.cassandra.db.compaction.LeveledManifest;
 import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableRewriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -50,11 +47,11 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
     private int sstablesWritten = 0;
     private final boolean skipAncestors;
 
-    public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, boolean offline, OperationType compactionType)
+    public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, boolean offline, OperationType compactionType)
     {
-        super(cfs, allSSTables, nonExpiredSSTables, offline);
+        super(cfs, txn, nonExpiredSSTables, offline);
         this.maxSSTableSize = maxSSTableSize;
-        this.allSSTables = allSSTables;
+        this.allSSTables = txn.originals();
         expectedWriteSize = Math.min(maxSSTableSize, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType));
         long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(nonExpiredSSTables) / maxSSTableSize);
         long keysPerSSTable = estimatedTotalKeys / estimatedSSTables;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index ab24bf8..8903ff7 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -18,16 +18,14 @@
 package org.apache.cassandra.db.compaction.writers;
 
 import java.io.File;
-import java.util.List;
 import java.util.Set;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.db.compaction.CompactionTask;
 import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableRewriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -41,10 +39,10 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
     private final long estimatedSSTables;
     private final Set<SSTableReader> allSSTables;
 
-    public MaxSSTableSizeWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, int level, boolean offline, OperationType compactionType)
+    public MaxSSTableSizeWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, int level, boolean offline, OperationType compactionType)
     {
-        super(cfs, allSSTables, nonExpiredSSTables, offline);
-        this.allSSTables = allSSTables;
+        super(cfs, txn, nonExpiredSSTables, offline);
+        this.allSSTables = txn.originals();
         this.level = level;
         this.maxSSTableSize = maxSSTableSize;
         long totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index 2a452c7..81ea6b1 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db.compaction.writers;
 
 import java.io.File;
 import java.util.Arrays;
-import java.util.List;
 import java.util.Set;
 
 import org.slf4j.Logger;
@@ -28,10 +27,9 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.db.compaction.CompactionTask;
 import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableRewriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -53,15 +51,15 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
     private long currentBytesToWrite;
     private int currentRatioIndex = 0;
 
-    public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, OperationType compactionType)
+    public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, OperationType compactionType)
     {
-        this(cfs, allSSTables, nonExpiredSSTables, compactionType, DEFAULT_SMALLEST_SSTABLE_BYTES);
+        this(cfs, txn, nonExpiredSSTables, compactionType, DEFAULT_SMALLEST_SSTABLE_BYTES);
     }
 
-    public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, OperationType compactionType, long smallestSSTable)
+    public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, OperationType compactionType, long smallestSSTable)
     {
-        super(cfs, allSSTables, nonExpiredSSTables, false);
-        this.allSSTables = allSSTables;
+        super(cfs, txn, nonExpiredSSTables, false);
+        this.allSSTables = txn.originals();
         totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);
         double[] potentialRatios = new double[20];
         double currentRatio = 1;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index f8b3aba..ba48350 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -57,7 +57,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
                                                              indexedCfMetadata.cfName,
                                                              new LocalPartitioner(getIndexKeyComparator()),
                                                              indexedCfMetadata,
-                                                             baseCfs.getDataTracker().loadsstables);
+                                                             baseCfs.getTracker().loadsstables);
     }
 
     protected AbstractType<?> getIndexKeyComparator()
@@ -143,7 +143,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
     {
         Future<?> wait;
         // we synchronise on the baseCfs to make sure we are ordered correctly with other flushes to the base CFS
-        synchronized (baseCfs.getDataTracker())
+        synchronized (baseCfs.getTracker())
         {
             wait = indexCfs.forceFlush();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index dda532d..4c1bf45 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -346,7 +346,7 @@ public class SecondaryIndexManager
     {
         // despatch flushes for all CFS backed indexes
         List<Future<?>> wait = new ArrayList<>();
-        synchronized (baseCfs.getDataTracker())
+        synchronized (baseCfs.getTracker())
         {
             for (SecondaryIndex index : allIndexes)
                 if (index.getIndexCfs() != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
new file mode 100644
index 0000000..05f7531
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.*;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.*;
+
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
+import static com.google.common.base.Predicates.*;
+import static com.google.common.collect.Iterables.any;
+import static com.google.common.collect.Iterables.concat;
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.getFirst;
+import static org.apache.cassandra.utils.Throwables.merge;
+
+class Helpers
+{
+    /**
+     * update the contents of a set with the provided sets, ensuring that the items to remove are
+     * really present, and that the items to add are not (unless we're also removing them)
+     * @return a new set with the contents of the provided one modified
+     */
+    static <T> Set<T> replace(Set<T> original, Set<T> remove, Iterable<T> add)
+    {
+        return ImmutableSet.copyOf(replace(identityMap(original), remove, add).keySet());
+    }
+
+    /**
+     * update the contents of an "identity map" with the provided sets, ensuring that the items to remove are
+     * really present, and that the items to add are not (unless we're also removing them)
+     * @return a new identity map with the contents of the provided one modified
+     */
+    static <T> Map<T, T> replace(Map<T, T> original, Set<T> remove, Iterable<T> add)
+    {
+        // ensure the ones being removed are the exact same ones present
+        for (T reader : remove)
+            assert original.get(reader) == reader;
+
+        // ensure we don't already contain any we're adding, that we aren't also removing
+        assert !any(add, and(not(in(remove)), in(original.keySet()))) : String.format("original:%s remove:%s add:%s", original.keySet(), remove, add);
+
+        Map<T, T> result =
+            identityMap(concat(add, filter(original.keySet(), not(in(remove)))));
+
+        assert result.size() == original.size() - remove.size() + Iterables.size(add) :
+        String.format("Expecting new size of %d, got %d while replacing %s by %s in %s",
+                      original.size() - remove.size() + Iterables.size(add), result.size(), remove, add, original.keySet());
+        return result;
+    }
+
+    /**
+     * A convenience method for encapsulating this action over multiple SSTableReader with exception-safety
+     * @return accumulate if not null (with any thrown exception attached), or any thrown exception otherwise
+     */
+    static Throwable setupDeleteNotification(Iterable<SSTableReader> readers, Tracker tracker, Throwable accumulate)
+    {
+        try
+        {
+            for (SSTableReader reader : readers)
+                reader.setupDeleteNotification(tracker);
+        }
+        catch (Throwable t)
+        {
+            // shouldn't be possible, but in case the contract changes in future and we miss it...
+            accumulate = merge(accumulate, t);
+        }
+        return accumulate;
+    }
+
+    /**
+     * A convenience method for encapsulating this action over multiple SSTableReader with exception-safety
+     * @return accumulate if not null (with any thrown exception attached), or any thrown exception otherwise
+     */
+    static Throwable setReplaced(Iterable<SSTableReader> readers, Throwable accumulate)
+    {
+        for (SSTableReader reader : readers)
+        {
+            try
+            {
+                reader.setReplaced();
+            }
+            catch (Throwable t)
+            {
+                accumulate = merge(accumulate, t);
+            }
+        }
+        return accumulate;
+    }
+
+    /**
+     * assert that none of these readers have been replaced
+     */
+    static void checkNotReplaced(Iterable<SSTableReader> readers)
+    {
+        for (SSTableReader reader : readers)
+            assert !reader.isReplaced();
+    }
+
+    /**
+     * A convenience method for encapsulating this action over multiple SSTableReader with exception-safety
+     * @return accumulate if not null (with any thrown exception attached), or any thrown exception otherwise
+     */
+    static Throwable markObsolete(Iterable<SSTableReader> readers, Throwable accumulate)
+    {
+        for (SSTableReader reader : readers)
+        {
+            try
+            {
+                boolean firstToCompact = reader.markObsolete();
+                assert firstToCompact : reader + " was already marked compacted";
+            }
+            catch (Throwable t)
+            {
+                accumulate = merge(accumulate, t);
+            }
+        }
+        return accumulate;
+    }
+
+    /**
+     * @return the identity function, as a Map, with domain of the provided values
+     */
+    static <T> Map<T, T> identityMap(Iterable<T> values)
+    {
+        ImmutableMap.Builder<T, T> builder = ImmutableMap.<T, T>builder();
+        for (T t : values)
+            builder.put(t, t);
+        return builder.build();
+    }
+
+    /**
+     * @return an Iterable of the union if the sets, with duplicates being represented by their first encountered instance
+     * (as defined by the order of set provision)
+     */
+    static <T> Iterable<T> concatUniq(Set<T>... sets)
+    {
+        List<Predicate<T>> notIn = new ArrayList<>(sets.length);
+        for (Set<T> set : sets)
+            notIn.add(not(in(set)));
+        List<Iterable<T>> results = new ArrayList<>(sets.length);
+        for (int i = 0 ; i < sets.length ; i++)
+            results.add(filter(sets[i], and(notIn.subList(0, i))));
+        return concat(results);
+    }
+
+    /**
+     * @return a Predicate yielding true for an item present in NONE of the provided sets
+     */
+    static <T> Predicate<T> notIn(Set<T>... sets)
+    {
+        return not(orIn(sets));
+    }
+
+    /**
+     * @return a Predicate yielding true for an item present in ANY of the provided sets
+     */
+    static <T> Predicate<T> orIn(Collection<T>... sets)
+    {
+        Predicate<T>[] orIn = new Predicate[sets.length];
+        for (int i = 0 ; i < orIn.length ; i++)
+            orIn[i] = in(sets[i]);
+        return or(orIn);
+    }
+
+    /**
+     * filter out (i.e. remove) matching elements
+     * @return filter, filtered to only those elements that *are not* present in *any* of the provided sets (are present in none)
+     */
+    static <T> Iterable<T> filterOut(Iterable<T> filter, Set<T>... inNone)
+    {
+        return filter(filter, notIn(inNone));
+    }
+
+    /**
+     * filter in (i.e. retain)
+     *
+     * @return filter, filtered to only those elements that *are* present in *any* of the provided sets
+     */
+    static <T> Iterable<T> filterIn(Iterable<T> filter, Set<T>... inAny)
+    {
+        return filter(filter, orIn(inAny));
+    }
+
+    static Set<SSTableReader> emptySet()
+    {
+        return Collections.emptySet();
+    }
+
+    static <T> T select(T t, Collection<T> col)
+    {
+        if (col instanceof Set && !col.contains(t))
+            return null;
+        return getFirst(filter(col, equalTo(t)), null);
+    }
+
+    static <T> T selectFirst(T t, Collection<T> ... sets)
+    {
+        for (Collection<T> set : sets)
+        {
+            T select = select(t, set);
+            if (select != null)
+                return select;
+        }
+        return null;
+    }
+
+    static <T> Predicate<T> idIn(Set<T> set)
+    {
+        return idIn(identityMap(set));
+    }
+
+    static <T> Predicate<T> idIn(final Map<T, T> identityMap)
+    {
+        return new Predicate<T>()
+        {
+            public boolean apply(T t)
+            {
+                return identityMap.get(t) == t;
+            }
+        };
+    }
+
+}