You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/05/22 10:46:34 UTC

[3/7] cassandra git commit: Extend Transactional API to sstable lifecycle management

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index a526ec9..8029075 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -20,33 +20,28 @@ package org.apache.cassandra.io.sstable;
 import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DataTracker;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.utils.CLibrary;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.Refs;
 import org.apache.cassandra.utils.concurrent.Transactional;
 
-import static org.apache.cassandra.utils.Throwables.merge;
-
 /**
  * Wraps one or more writers as output for rewriting one or more readers: every sstable_preemptive_open_interval_in_mb
  * we look in the summary we're collecting for the latest writer for the penultimate key that we know to have been fully
  * flushed to the index file, and then double check that the key is fully present in the flushed data file.
- * Then we move the starts of each reader forwards to that point, replace them in the datatracker, and attach a runnable
+ * Then we move the starts of each reader forwards to that point, replace them in the Tracker, and attach a runnable
  * for on-close (i.e. when all references expire) that drops the page cache prior to that key position
  *
  * hard-links are created for each partially written sstable so that readers opened against them continue to work past
  * the rename of the temporary file, which is deleted once all readers against the hard-link have been closed.
- * If for any reason the writer is rolled over, we immediately rename and fully expose the completed file in the DataTracker.
+ * If for any reason the writer is rolled over, we immediately rename and fully expose the completed file in the Tracker.
  *
  * On abort we restore the original lower bounds to the existing readers and delete any temporary files we had in progress,
  * but leave any hard-links in place for the readers we opened to cleanup when they're finished as we would had we finished
@@ -74,26 +69,19 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
         return preemptiveOpenInterval;
     }
 
-    private final DataTracker dataTracker;
     private final ColumnFamilyStore cfs;
 
     private final long maxAge;
     private long repairedAt = -1;
     // the set of final readers we will expose on commit
+    private final LifecycleTransaction transaction; // the readers we are rewriting (updated as they are replaced)
     private final List<SSTableReader> preparedForCommit = new ArrayList<>();
-    private final Set<SSTableReader> rewriting; // the readers we are rewriting (updated as they are replaced)
-    private final Map<Descriptor, DecoratedKey> originalStarts = new HashMap<>(); // the start key for each reader we are rewriting
     private final Map<Descriptor, Integer> fileDescriptors = new HashMap<>(); // the file descriptors for each reader descriptor we are rewriting
 
-    private SSTableReader currentlyOpenedEarly; // the reader for the most recent (re)opening of the target file
     private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at
 
-    private final List<Finished> finishedWriters = new ArrayList<>();
-    // as writers are closed from finishedWriters, their last readers are moved into discard, so that abort can cleanup
-    // after us safely; we use a set so we can add in both prepareToCommit and abort
-    private final Set<SSTableReader> discard = new HashSet<>();
-    // true for operations that are performed without Cassandra running (prevents updates of DataTracker)
-    private final boolean isOffline;
+    private final List<SSTableWriter> writers = new ArrayList<>();
+    private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of Tracker)
 
     private SSTableWriter writer;
     private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
@@ -101,15 +89,11 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
     // for testing (TODO: remove when have byteman setup)
     private boolean throwEarly, throwLate;
 
-    public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline)
+    public SSTableRewriter(ColumnFamilyStore cfs, LifecycleTransaction transaction, long maxAge, boolean isOffline)
     {
-        this.rewriting = rewriting;
-        for (SSTableReader sstable : rewriting)
-        {
-            originalStarts.put(sstable.descriptor, sstable.first);
+        this.transaction = transaction;
+        for (SSTableReader sstable : this.transaction.originals())
             fileDescriptors.put(sstable.descriptor, CLibrary.getfd(sstable.getFilename()));
-        }
-        this.dataTracker = cfs.getDataTracker();
         this.cfs = cfs;
         this.maxAge = maxAge;
         this.isOffline = isOffline;
@@ -134,7 +118,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
             else
             {
                 boolean save = false;
-                for (SSTableReader reader : rewriting)
+                for (SSTableReader reader : transaction.originals())
                 {
                     if (reader.getCachedPosition(row.key, false) != null)
                     {
@@ -170,7 +154,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
         {
             if (isOffline)
             {
-                for (SSTableReader reader : rewriting)
+                for (SSTableReader reader : transaction.originals())
                 {
                     RowIndexEntry index = reader.getPosition(key, SSTableReader.Operator.GE);
                     CLibrary.trySkipCache(fileDescriptors.get(reader.descriptor), 0, index == null ? 0 : index.position);
@@ -181,10 +165,10 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
                 SSTableReader reader = writer.setMaxDataAge(maxAge).openEarly();
                 if (reader != null)
                 {
-                    replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
-                    currentlyOpenedEarly = reader;
+                    transaction.update(reader, false);
                     currentlyOpenedEarlyAt = writer.getFilePointer();
-                    moveStarts(reader, reader.last, false);
+                    moveStarts(reader, reader.last);
+                    transaction.checkpoint();
                 }
             }
         }
@@ -192,59 +176,19 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
 
     protected Throwable doAbort(Throwable accumulate)
     {
-        try
-        {
-            moveStarts(null, null, true);
-        }
-        catch (Throwable t)
-        {
-            accumulate = merge(accumulate, t);
-        }
-
-        // cleanup any sstables we prepared for commit
-        for (SSTableReader sstable : preparedForCommit)
-        {
-            try
-            {
-                sstable.markObsolete();
-                sstable.selfRef().release();
-            }
-            catch (Throwable t)
-            {
-                accumulate = merge(accumulate , t);
-            }
-        }
-
-        // abort the writers, and add the early opened readers to our discard pile
-
-        if (writer != null)
-            finishedWriters.add(new Finished(writer, currentlyOpenedEarly));
-
-        for (Finished finished : finishedWriters)
-        {
-            accumulate = finished.writer.abort(accumulate);
-
-            // if we've already been opened, add ourselves to the discard pile
-            if (finished.reader != null)
-                discard.add(finished.reader);
-        }
-
-        accumulate = replaceWithFinishedReaders(Collections.<SSTableReader>emptyList(), accumulate);
+        // abort the writers
+        for (SSTableWriter writer : writers)
+            accumulate = writer.abort(accumulate);
+        // abort the lifecycle transaction
+        accumulate = transaction.abort(accumulate);
         return accumulate;
     }
 
     protected Throwable doCommit(Throwable accumulate)
     {
-        for (Finished f : finishedWriters)
-            accumulate = f.writer.commit(accumulate);
-        accumulate = replaceWithFinishedReaders(preparedForCommit, accumulate);
-
-        return accumulate;
-    }
-
-    protected Throwable doCleanup(Throwable accumulate)
-    {
-        // we have no state of our own to cleanup; Transactional objects cleanup their own state in abort or commit
+        for (SSTableWriter writer : writers)
+            accumulate = writer.commit(accumulate);
+        accumulate = transaction.commit(accumulate);
         return accumulate;
     }
 
@@ -260,100 +204,70 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
      *
      * @param newReader the rewritten reader that replaces them for this region
      * @param lowerbound if !reset, must be non-null, and marks the exclusive lowerbound of the start for each sstable
-     * @param reset true iff we are restoring earlier starts (increasing the range over which they are valid)
      */
-    private void moveStarts(SSTableReader newReader, DecoratedKey lowerbound, boolean reset)
+    private void moveStarts(SSTableReader newReader, DecoratedKey lowerbound)
     {
         if (isOffline)
             return;
         if (preemptiveOpenInterval == Long.MAX_VALUE)
             return;
 
-        List<SSTableReader> toReplace = new ArrayList<>();
-        List<SSTableReader> replaceWith = new ArrayList<>();
         final List<DecoratedKey> invalidateKeys = new ArrayList<>();
-        if (!reset)
-        {
-            invalidateKeys.addAll(cachedKeys.keySet());
-            for (Map.Entry<DecoratedKey, RowIndexEntry> cacheKey : cachedKeys.entrySet())
-                newReader.cacheKey(cacheKey.getKey(), cacheKey.getValue());
-        }
+        invalidateKeys.addAll(cachedKeys.keySet());
+        for (Map.Entry<DecoratedKey, RowIndexEntry> cacheKey : cachedKeys.entrySet())
+            newReader.cacheKey(cacheKey.getKey(), cacheKey.getValue());
 
         cachedKeys = new HashMap<>();
-        for (SSTableReader sstable : ImmutableList.copyOf(rewriting))
+        for (SSTableReader sstable : transaction.originals())
         {
             // we call getCurrentReplacement() to support multiple rewriters operating over the same source readers at once.
             // note: only one such writer should be written to at any moment
-            final SSTableReader latest = sstable.getCurrentReplacement();
-            SSTableReader replacement;
-            if (reset)
-            {
-                DecoratedKey newStart = originalStarts.get(sstable.descriptor);
-                replacement = latest.cloneWithNewStart(newStart, null);
-            }
-            else
-            {
-                // skip any sstables that we know to already be shadowed
-                if (latest.openReason == SSTableReader.OpenReason.SHADOWED)
-                    continue;
-                if (latest.first.compareTo(lowerbound) > 0)
-                    continue;
+            final SSTableReader latest = transaction.current(sstable);
 
-                final Runnable runOnClose = new Runnable()
-                {
-                    public void run()
-                    {
-                        // this is somewhat racey, in that we could theoretically be closing this old reader
-                        // when an even older reader is still in use, but it's not likely to have any major impact
-                        for (DecoratedKey key : invalidateKeys)
-                            latest.invalidateCacheKey(key);
-                    }
-                };
+            // skip any sstables that we know to already be shadowed
+            if (latest.first.compareTo(lowerbound) > 0)
+                continue;
 
-                if (lowerbound.compareTo(latest.last) >= 0)
+            final Runnable runOnClose = new Runnable()
+            {
+                public void run()
                 {
-                    replacement = latest.cloneAsShadowed(runOnClose);
+                    // this is somewhat racey, in that we could theoretically be closing this old reader
+                    // when an even older reader is still in use, but it's not likely to have any major impact
+                    for (DecoratedKey key : invalidateKeys)
+                        latest.invalidateCacheKey(key);
                 }
-                else
+            };
+
+            if (lowerbound.compareTo(latest.last) >= 0)
+            {
+                if (!transaction.isObsolete(latest))
                 {
-                    DecoratedKey newStart = latest.firstKeyBeyond(lowerbound);
-                    assert newStart != null;
-                    replacement = latest.cloneWithNewStart(newStart, runOnClose);
+                    latest.runOnClose(runOnClose);
+                    transaction.obsolete(latest);
                 }
+                continue;
             }
 
-            toReplace.add(latest);
-            replaceWith.add(replacement);
-            rewriting.remove(sstable);
-            rewriting.add(replacement);
+            DecoratedKey newStart = latest.firstKeyBeyond(lowerbound);
+            assert newStart != null;
+            SSTableReader replacement = latest.cloneWithNewStart(newStart, runOnClose);
+            transaction.update(replacement, true);
         }
-        cfs.getDataTracker().replaceWithNewInstances(toReplace, replaceWith);
-    }
-
-    private void replaceEarlyOpenedFile(SSTableReader toReplace, SSTableReader replaceWith)
-    {
-        if (isOffline)
-            return;
-        Set<SSTableReader> toReplaceSet;
-        if (toReplace != null)
-        {
-            toReplace.setReplacedBy(replaceWith);
-            toReplaceSet = Collections.singleton(toReplace);
-        }
-        else
-        {
-            dataTracker.markCompacting(Collections.singleton(replaceWith), true, isOffline);
-            toReplaceSet = Collections.emptySet();
-        }
-        dataTracker.replaceEarlyOpenedFiles(toReplaceSet, Collections.singleton(replaceWith));
     }
 
     public void switchWriter(SSTableWriter newWriter)
     {
+        if (newWriter != null)
+            writers.add(newWriter.setMaxDataAge(maxAge));
+
         if (writer == null || writer.getFilePointer() == 0)
         {
             if (writer != null)
+            {
                 writer.abort();
+                writers.remove(writer);
+            }
             writer = newWriter;
             return;
         }
@@ -361,14 +275,13 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
         SSTableReader reader = null;
         if (preemptiveOpenInterval != Long.MAX_VALUE)
         {
-            // we leave it as a tmp file, but we open it and add it to the dataTracker
+            // we leave it as a tmp file, but we open it and add it to the Tracker
             reader = writer.setMaxDataAge(maxAge).openFinalEarly();
-            replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
-            moveStarts(reader, reader.last, false);
+            transaction.update(reader, false);
+            moveStarts(reader, reader.last);
+            transaction.checkpoint();
         }
-        finishedWriters.add(new Finished(writer, reader));
 
-        currentlyOpenedEarly = null;
         currentlyOpenedEarlyAt = 0;
         writer = newWriter;
     }
@@ -387,12 +300,12 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
     /**
      * Finishes the new file(s)
      *
-     * Creates final files, adds the new files to the dataTracker (via replaceReader).
+     * Creates final files, adds the new files to the Tracker (via replaceReader).
      *
      * We add them to the tracker to be able to get rid of the tmpfiles
      *
      * It is up to the caller to do the compacted sstables replacement
-     * gymnastics (ie, call DataTracker#markCompactedSSTablesReplaced(..))
+     * gymnastics (ie, call Tracker#markCompactedSSTablesReplaced(..))
      *
      *
      */
@@ -402,6 +315,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
         return finished();
     }
 
+    // returns, in list form, the
     public List<SSTableReader> finished()
     {
         assert state() == State.COMMITTED || state() == State.READY_TO_COMMIT;
@@ -416,82 +330,31 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
             throw new RuntimeException("exception thrown early in finish, for testing");
 
         // No early open to finalize and replace
-        for (Finished f : finishedWriters)
+        for (SSTableWriter writer : writers)
         {
-            if (f.reader != null)
-                discard.add(f.reader);
-
-            f.writer.setRepairedAt(repairedAt).setMaxDataAge(maxAge).setOpenResult(true).prepareToCommit();
-            SSTableReader newReader = f.writer.finished();
-
-            if (f.reader != null)
-                f.reader.setReplacedBy(newReader);
-
-            preparedForCommit.add(newReader);
+            assert writer.getFilePointer() > 0;
+            writer.setRepairedAt(repairedAt).setOpenResult(true).prepareToCommit();
+            SSTableReader reader = writer.finished();
+            transaction.update(reader, false);
+            preparedForCommit.add(reader);
         }
+        transaction.checkpoint();
 
         if (throwLate)
             throw new RuntimeException("exception thrown after all sstables finished, for testing");
-    }
 
-    @VisibleForTesting
-    void throwDuringPrepare(boolean throwEarly)
-    {
-        this.throwEarly = throwEarly;
-        this.throwLate = !throwEarly;
-    }
+        // TODO: do we always want to avoid obsoleting if offline?
+        if (!isOffline)
+            transaction.obsoleteOriginals();
 
-    // cleanup all our temporary readers and swap in our new ones
-    private Throwable replaceWithFinishedReaders(List<SSTableReader> finished, Throwable accumulate)
-    {
-        if (isOffline)
-        {
-            for (SSTableReader reader : discard)
-            {
-                try
-                {
-                    if (reader.getCurrentReplacement() == reader)
-                        reader.markObsolete();
-                }
-                catch (Throwable t)
-                {
-                    accumulate = merge(accumulate, t);
-                }
-            }
-            accumulate = Refs.release(Refs.selfRefs(discard), accumulate);
-        }
-        else
-        {
-            try
-            {
-                dataTracker.replaceEarlyOpenedFiles(discard, finished);
-            }
-            catch (Throwable t)
-            {
-                accumulate = merge(accumulate, t);
-            }
-            try
-            {
-                dataTracker.unmarkCompacting(discard);
-            }
-            catch (Throwable t)
-            {
-                accumulate = merge(accumulate, t);
-            }
-        }
-        discard.clear();
-        return accumulate;
+        transaction.prepareToCommit();
     }
 
-    private static final class Finished
+    public void throwDuringPrepare(boolean earlyException)
     {
-        final SSTableWriter writer;
-        final SSTableReader reader;
-
-        private Finished(SSTableWriter writer, SSTableReader reader)
-        {
-            this.writer = writer;
-            this.reader = reader;
-        }
+        if (earlyException)
+            throwEarly = true;
+        else
+            throwLate = true;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 23c27b0..8e701b3 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.lifecycle.Tracker;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.*;
@@ -122,7 +123,7 @@ import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR
  * managed as another resource each instance tracks its own Ref instance to, to ensure all of these resources are
  * cleaned up safely and can be debugged otherwise.
  *
- * TODO: fill in details about DataTracker and lifecycle interactions for tools, and for compaction strategies
+ * TODO: fill in details about Tracker and lifecycle interactions for tools, and for compaction strategies
  */
 public abstract class SSTableReader extends SSTable implements SelfRefCounted<SSTableReader>
 {
@@ -141,6 +142,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         }
     };
 
+    // it's just an object, which we use regular Object equality on; we introduce a special class just for easy recognition
+    public static final class UniqueIdentifier {}
+
     public static final Comparator<SSTableReader> sstableComparator = new Comparator<SSTableReader>()
     {
         public int compare(SSTableReader o1, SSTableReader o2)
@@ -170,11 +174,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         NORMAL,
         EARLY,
         METADATA_CHANGE,
-        MOVED_START,
-        SHADOWED // => MOVED_START past end
+        MOVED_START
     }
 
     public final OpenReason openReason;
+    public final UniqueIdentifier instanceId = new UniqueIdentifier();
 
     // indexfile and datafile: might be null before a call to load()
     protected SegmentedFile ifile;
@@ -594,9 +598,22 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         return ifile.path();
     }
 
-    public void setTrackedBy(DataTracker tracker)
+    // this is only used for restoring tracker state at delete (and wiring up the keycache) and so
+    // should only be called once it is actually added to the tracker
+    public void setupDeleteNotification(Tracker tracker)
     {
         tidy.type.deletingTask.setTracker(tracker);
+        setupKeyCache();
+    }
+
+    @VisibleForTesting
+    public boolean isDeleteNotificationSetup()
+    {
+        return tidy.type.deletingTask.getTracker() != null;
+    }
+
+    public void setupKeyCache()
+    {
         // under normal operation we can do this at any time, but SSTR is also used outside C* proper,
         // e.g. by BulkLoader, which does not initialize the cache.  As a kludge, we set up the cache
         // here when we know we're being wired into the rest of the server infrastructure.
@@ -908,15 +925,38 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         }
     }
 
-    public void setReplacedBy(SSTableReader replacement)
+    public void setReplaced()
     {
         synchronized (tidy.global)
         {
-            assert replacement != null;
             assert !tidy.isReplaced;
-            assert tidy.global.live == this;
             tidy.isReplaced = true;
-            tidy.global.live = replacement;
+        }
+    }
+
+    public boolean isReplaced()
+    {
+        synchronized (tidy.global)
+        {
+            return tidy.isReplaced;
+        }
+    }
+
+    public void runOnClose(final Runnable runOnClose)
+    {
+        synchronized (tidy.global)
+        {
+            final Runnable existing = tidy.runOnClose;
+            tidy.runOnClose = existing == null
+                              ? runOnClose
+                              : new Runnable()
+                                {
+                                    public void run()
+                                    {
+                                        existing.run();
+                                        runOnClose.run();
+                                    }
+                                };
         }
     }
 
@@ -948,32 +988,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
 
             replacement.first = newStart;
             replacement.last = this.last;
-            setReplacedBy(replacement);
-            return replacement;
-        }
-    }
-
-    public SSTableReader cloneAsShadowed(final Runnable runOnClose)
-    {
-        synchronized (tidy.global)
-        {
-            assert openReason != OpenReason.EARLY;
-            this.tidy.runOnClose = new Runnable()
-            {
-                public void run()
-                {
-                    dfile.dropPageCache(0);
-                    ifile.dropPageCache(0);
-                    runOnClose.run();
-                }
-            };
-
-            SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile.sharedCopy(),
-                                                          dfile.sharedCopy(), indexSummary.sharedCopy(), bf.sharedCopy(),
-                                                          maxDataAge, sstableMetadata, OpenReason.SHADOWED);
-            replacement.first = first;
-            replacement.last = last;
-            setReplacedBy(replacement);
             return replacement;
         }
     }
@@ -1036,7 +1050,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                                                      sstableMetadata, OpenReason.METADATA_CHANGE);
             replacement.first = this.first;
             replacement.last = this.last;
-            setReplacedBy(replacement);
             return replacement;
         }
     }
@@ -1520,7 +1533,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
      * except for threads holding a reference.
      *
      * @return true if the this is the first time the file was marked obsolete.  Calling this
-     * multiple times is usually buggy (see exceptions in DataTracker.unmarkCompacting and removeOldSSTablesSize).
+     * multiple times is usually buggy (see exceptions in Tracker.unmarkCompacting and removeOldSSTablesSize).
      */
     public boolean markObsolete()
     {
@@ -1638,11 +1651,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
     }
 
-    public SSTableReader getCurrentReplacement()
-    {
-        return tidy.global.live;
-    }
-
     /**
      * TODO: Move someplace reusable
      */
@@ -2048,8 +2056,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         static final ConcurrentMap<Descriptor, Ref<GlobalTidy>> lookup = new ConcurrentHashMap<>();
 
         private final Descriptor desc;
-        // a single convenience property for getting the most recent version of an sstable, not related to tidying
-        private SSTableReader live;
         // the readMeter that is shared between all instances of the sstable, and can be overridden in all of them
         // at once also, for testing purposes
         private RestorableMeter readMeter;
@@ -2064,7 +2070,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         {
             this.desc = reader.descriptor;
             this.isCompacted = new AtomicBoolean();
-            this.live = reader;
         }
 
         void ensureReadMeter()
@@ -2128,6 +2133,13 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         }
     }
 
+    @VisibleForTesting
+    public static void resetTidying()
+    {
+        GlobalTidy.lookup.clear();
+        DescriptorTypeTidy.lookup.clear();
+    }
+
     public static abstract class Factory
     {
         public abstract SSTableReader open(final Descriptor descriptor,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index fa17c20..a7a7fcc 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -377,7 +377,8 @@ public class BigTableWriter extends SSTableWriter
             return accumulate;
         }
 
-        protected Throwable doCleanup(Throwable accumulate)
+        @Override
+        protected Throwable doPreCleanup(Throwable accumulate)
         {
             accumulate = dbuilder.close(accumulate);
             return accumulate;
@@ -562,7 +563,8 @@ public class BigTableWriter extends SSTableWriter
             return indexFile.abort(accumulate);
         }
 
-        protected Throwable doCleanup(Throwable accumulate)
+        @Override
+        protected Throwable doPreCleanup(Throwable accumulate)
         {
             accumulate = summary.close(accumulate);
             accumulate = bf.close(accumulate);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index d63be31..3c35a34 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -77,7 +77,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
     protected class TransactionalProxy extends AbstractTransactional
     {
         @Override
-        protected Throwable doCleanup(Throwable accumulate)
+        protected Throwable doPreCleanup(Throwable accumulate)
         {
             if (directoryFD >= 0)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
index 842d06d..4ab4446 100644
--- a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
@@ -215,28 +215,28 @@ public class ColumnFamilyMetrics
         {
             public Long getValue()
             {
-                return cfs.getDataTracker().getView().getCurrentMemtable().getOperations();
+                return cfs.getTracker().getView().getCurrentMemtable().getOperations();
             }
         });
         memtableOnHeapSize = createColumnFamilyGauge("MemtableOnHeapSize", new Gauge<Long>()
         {
             public Long getValue()
             {
-                return cfs.getDataTracker().getView().getCurrentMemtable().getAllocator().onHeap().owns();
+                return cfs.getTracker().getView().getCurrentMemtable().getAllocator().onHeap().owns();
             }
         });
         memtableOffHeapSize = createColumnFamilyGauge("MemtableOffHeapSize", new Gauge<Long>()
         {
             public Long getValue()
             {
-                return cfs.getDataTracker().getView().getCurrentMemtable().getAllocator().offHeap().owns();
+                return cfs.getTracker().getView().getCurrentMemtable().getAllocator().offHeap().owns();
             }
         });
         memtableLiveDataSize = createColumnFamilyGauge("MemtableLiveDataSize", new Gauge<Long>()
         {
             public Long getValue()
             {
-                return cfs.getDataTracker().getView().getCurrentMemtable().getLiveDataSize();
+                return cfs.getTracker().getView().getCurrentMemtable().getLiveDataSize();
             }
         });
         allMemtablesOnHeapSize = createColumnFamilyGauge("AllMemtablesHeapSize", new Gauge<Long>()
@@ -245,7 +245,7 @@ public class ColumnFamilyMetrics
             {
                 long size = 0;
                 for (ColumnFamilyStore cfs2 : cfs.concatWithIndexes())
-                    size += cfs2.getDataTracker().getView().getCurrentMemtable().getAllocator().onHeap().owns();
+                    size += cfs2.getTracker().getView().getCurrentMemtable().getAllocator().onHeap().owns();
                 return size;
             }
         });
@@ -255,7 +255,7 @@ public class ColumnFamilyMetrics
             {
                 long size = 0;
                 for (ColumnFamilyStore cfs2 : cfs.concatWithIndexes())
-                    size += cfs2.getDataTracker().getView().getCurrentMemtable().getAllocator().offHeap().owns();
+                    size += cfs2.getTracker().getView().getCurrentMemtable().getAllocator().offHeap().owns();
                 return size;
             }
         });
@@ -265,7 +265,7 @@ public class ColumnFamilyMetrics
             {
                 long size = 0;
                 for (ColumnFamilyStore cfs2 : cfs.concatWithIndexes())
-                    size += cfs2.getDataTracker().getView().getCurrentMemtable().getLiveDataSize();
+                    size += cfs2.getTracker().getView().getCurrentMemtable().getLiveDataSize();
                 return size;
             }
         });
@@ -288,7 +288,7 @@ public class ColumnFamilyMetrics
             public Long getValue()
             {
                 long memtablePartitions = 0;
-                for (Memtable memtable : cfs.getDataTracker().getView().getAllMemtables())
+                for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
                     memtablePartitions += memtable.partitionCount();
                 return SSTableReader.getApproximateKeyCount(cfs.getSSTables()) + memtablePartitions;
             }
@@ -358,7 +358,7 @@ public class ColumnFamilyMetrics
         {
             public Integer getValue()
             {
-                return cfs.getDataTracker().getSSTables().size();
+                return cfs.getTracker().getSSTables().size();
             }
         });
         liveDiskSpaceUsed = createColumnFamilyCounter("LiveDiskSpaceUsed");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 6a70692..44522db 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -28,13 +28,14 @@ import javax.annotation.Nullable;
 
 import com.google.common.base.Function;
 import com.google.common.collect.*;
+
+import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DataTracker;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.dht.AbstractBounds;
@@ -318,9 +319,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
                 final List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
                 for (Range<Token> range : ranges)
                     rowBoundsList.add(Range.makeRowRange(range));
-                refs.addAll(cfStore.selectAndReference(new Function<DataTracker.View, List<SSTableReader>>()
+                refs.addAll(cfStore.selectAndReference(new Function<View, List<SSTableReader>>()
                 {
-                    public List<SSTableReader> apply(DataTracker.View view)
+                    public List<SSTableReader> apply(View view)
                     {
                         List<SSTableReader> filteredSSTables = ColumnFamilyStore.CANONICAL_SSTABLES.apply(view);
                         Set<SSTableReader> sstables = Sets.newHashSet();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index 9f26637..d32ef88 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -32,12 +32,8 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
-import org.apache.cassandra.db.compaction.LeveledManifest;
-import org.apache.cassandra.db.compaction.Scrubber;
-import org.apache.cassandra.db.compaction.WrappingCompactionStrategy;
+import org.apache.cassandra.db.compaction.*;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -121,9 +117,9 @@ public class StandaloneScrubber
             {
                 for (SSTableReader sstable : sstables)
                 {
-                    try
+                    try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.SCRUB, sstable))
                     {
-                        Scrubber scrubber = new Scrubber(cfs, sstable, options.skipCorrupted, handler, true, !options.noValidate);
+                        Scrubber scrubber = new Scrubber(cfs, txn, options.skipCorrupted, handler, true, !options.noValidate);
                         try
                         {
                             scrubber.scrub();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
index 2541d6e..e881133 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
@@ -27,10 +27,12 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.commons.cli.*;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.SSTableSplitter;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Pair;
@@ -145,12 +147,11 @@ public class StandaloneSplitter
             if (options.snapshot)
                 System.out.println(String.format("Pre-split sstables snapshotted into snapshot %s", snapshotName));
 
-            cfs.getDataTracker().markCompacting(sstables, false, true);
             for (SSTableReader sstable : sstables)
             {
-                try
+                try (LifecycleTransaction transaction = LifecycleTransaction.offline(OperationType.UNKNOWN, sstable))
                 {
-                    new SSTableSplitter(cfs, sstable, options.sizeInMB).split();
+                    new SSTableSplitter(cfs, transaction, options.sizeInMB).split();
 
                     // Remove the sstable (it's been copied by split and snapshotted)
                     sstable.markObsolete();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
index 409a5f0..626d429 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
@@ -29,7 +29,9 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.compaction.Upgrader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.OutputHandler;
@@ -98,9 +100,9 @@ public class StandaloneUpgrader
 
             for (SSTableReader sstable : readers)
             {
-                try
+                try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.UPGRADE_SSTABLES, sstable))
                 {
-                    Upgrader upgrader = new Upgrader(cfs, sstable, handler);
+                    Upgrader upgrader = new Upgrader(cfs, txn, handler);
                     upgrader.upgrade();
 
                     if (!options.keepSource)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/utils/concurrent/Blocker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Blocker.java b/src/java/org/apache/cassandra/utils/concurrent/Blocker.java
new file mode 100644
index 0000000..5192e98
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/Blocker.java
@@ -0,0 +1,63 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class Blocker
+{
+    private final ReentrantLock lock = new ReentrantLock();
+    private final Condition unblocked = lock.newCondition();
+    private volatile boolean block = false;
+
+    public void block(boolean block)
+    {
+        this.block = block;
+        if (!block)
+        {
+            lock.lock();
+            try
+            {
+                unblocked.signalAll();
+            }
+            finally
+            {
+                lock.unlock();
+            }
+        }
+    }
+
+    public void ask()
+    {
+        if (block)
+        {
+            lock.lock();
+            try
+            {
+                while (block)
+                    unblocked.awaitUninterruptibly();
+            }
+            finally
+            {
+                lock.unlock();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
index bcf5095..5b0eb8e 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
@@ -70,6 +70,7 @@ public interface Transactional extends AutoCloseable
             ABORTED;
         }
 
+        private boolean permitRedundantTransitions;
         private State state = State.IN_PROGRESS;
 
         // the methods for actually performing the necessary behaviours, that are themselves protected against
@@ -79,9 +80,18 @@ public interface Transactional extends AutoCloseable
         protected abstract Throwable doCommit(Throwable accumulate);
         protected abstract Throwable doAbort(Throwable accumulate);
 
-        // this only needs to perform cleanup of state unique to this instance; any internal
+        // these only needs to perform cleanup of state unique to this instance; any internal
         // Transactional objects will perform cleanup in the commit() or abort() calls
-        protected abstract Throwable doCleanup(Throwable accumulate);
+
+        /**
+         * perform an exception-safe pre-abort cleanup; this will still be run *after* commit
+         */
+        protected Throwable doPreCleanup(Throwable accumulate){ return accumulate; }
+
+        /**
+         * perform an exception-safe post-abort cleanup
+         */
+        protected Throwable doPostCleanup(Throwable accumulate){ return accumulate; }
 
         /**
          * Do any preparatory work prior to commit. This method should throw any exceptions that can be encountered
@@ -94,10 +104,13 @@ public interface Transactional extends AutoCloseable
          */
         public final Throwable commit(Throwable accumulate)
         {
+            if (permitRedundantTransitions && state == State.COMMITTED)
+                return accumulate;
             if (state != State.READY_TO_COMMIT)
-                throw new IllegalStateException("Commit attempted before prepared to commit");
+                throw new IllegalStateException("Cannot commit unless READY_TO_COMMIT; state is " + state);
             accumulate = doCommit(accumulate);
-            accumulate = doCleanup(accumulate);
+            accumulate = doPreCleanup(accumulate);
+            accumulate = doPostCleanup(accumulate);
             state = State.COMMITTED;
             return accumulate;
         }
@@ -123,8 +136,9 @@ public interface Transactional extends AutoCloseable
             }
             state = State.ABORTED;
             // we cleanup first so that, e.g., file handles can be released prior to deletion
-            accumulate = doCleanup(accumulate);
+            accumulate = doPreCleanup(accumulate);
             accumulate = doAbort(accumulate);
+            accumulate = doPostCleanup(accumulate);
             return accumulate;
         }
 
@@ -147,6 +161,8 @@ public interface Transactional extends AutoCloseable
          */
         public final void prepareToCommit()
         {
+            if (permitRedundantTransitions && state == State.READY_TO_COMMIT)
+                return;
             if (state != State.IN_PROGRESS)
                 throw new IllegalStateException("Cannot prepare to commit unless IN_PROGRESS; state is " + state);
 
@@ -183,6 +199,11 @@ public interface Transactional extends AutoCloseable
         {
             return state;
         }
+
+        protected void permitRedundantTransitions()
+        {
+            permitRedundantTransitions = true;
+        }
     }
 
     // commit should generally never throw an exception, and preferably never generate one,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
index bf71639..e6c8f56 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
@@ -18,9 +18,7 @@
 */
 package org.apache.cassandra.db.compaction;
 
-import java.io.IOException;
 import java.util.*;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
@@ -35,6 +33,7 @@ import org.apache.cassandra.Util;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.SSTableUtils;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -126,8 +125,11 @@ public class LongCompactionsTest
 
         long start = System.nanoTime();
         final int gcBefore = (int) (System.currentTimeMillis() / 1000) - Schema.instance.getCFMetaData(KEYSPACE1, "Standard1").getGcGraceSeconds();
-        assert store.getDataTracker().markCompacting(sstables): "Cannot markCompacting all sstables";
-        new CompactionTask(store, sstables, gcBefore, false).execute(null);
+        try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.COMPACTION))
+        {
+            assert txn != null : "Cannot markCompacting all sstables";
+            new CompactionTask(store, txn, gcBefore, false).execute(null);
+        }
         System.out.println(String.format("%s: sstables=%d rowsper=%d colsper=%d: %d ms",
                                          this.getClass().getName(),
                                          sstableCount,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/MockSchema.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/MockSchema.java b/test/unit/org/apache/cassandra/MockSchema.java
new file mode 100644
index 0000000..bc236e1
--- /dev/null
+++ b/test/unit/org/apache/cassandra/MockSchema.java
@@ -0,0 +1,167 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.cache.CachingOptions;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.SimpleSparseCellNameType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.IndexSummary;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.BufferedSegmentedFile;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.Memory;
+import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.utils.AlwaysPresentFilter;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class MockSchema
+{
+    static
+    {
+        Memory offsets = Memory.allocate(4);
+        offsets.setInt(0, 0);
+        indexSummary = new IndexSummary(Murmur3Partitioner.instance, offsets, 0, Memory.allocate(4), 0, 0, 0, 1);
+    }
+    private static final AtomicInteger id = new AtomicInteger();
+    public static final Keyspace ks = Keyspace.mockKS(new KSMetaData("mockks", SimpleStrategy.class, ImmutableMap.of("replication_factor", "1"), false));
+    public static final ColumnFamilyStore cfs = newCFS();
+
+    private static final IndexSummary indexSummary;
+    private static final SegmentedFile segmentedFile = new BufferedSegmentedFile(new ChannelProxy(temp("mocksegmentedfile")), 0);
+
+    public static Memtable memtable()
+    {
+        return new Memtable(cfs.metadata);
+    }
+
+    public static SSTableReader sstable(int generation)
+    {
+        return sstable(generation, false);
+    }
+
+    public static SSTableReader sstable(int generation, boolean keepRef)
+    {
+        return sstable(generation, 0, keepRef);
+    }
+
+    public static SSTableReader sstable(int generation, int size)
+    {
+        return sstable(generation, size, false);
+    }
+
+    public static SSTableReader sstable(int generation, int size, boolean keepRef)
+    {
+        return sstable(generation, size, keepRef, cfs);
+    }
+    public static SSTableReader sstable(int generation, int size, boolean keepRef, ColumnFamilyStore cfs)
+    {
+        Descriptor descriptor = new Descriptor(temp("mockcfdir").getParentFile(), "mockks", "mockcf", generation, Descriptor.Type.FINAL);
+        Set<Component> components = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.TOC);
+        for (Component component : components)
+        {
+            File file = new File(descriptor.filenameFor(component));
+            try
+            {
+                file.createNewFile();
+            }
+            catch (IOException e)
+            {
+            }
+            file.deleteOnExit();
+        }
+        if (size > 0)
+        {
+            try
+            {
+                File file = new File(descriptor.filenameFor(Component.DATA));
+                try (RandomAccessFile raf = new RandomAccessFile(file, "rw"))
+                {
+                    raf.setLength(size);
+                }
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+        StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator)
+                                                 .finalizeMetadata(Murmur3Partitioner.instance.getClass().getCanonicalName(), 0.01f, -1)
+                                                 .get(MetadataType.STATS);
+        SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata, Murmur3Partitioner.instance,
+                                                          segmentedFile.sharedCopy(), segmentedFile.sharedCopy(), indexSummary.sharedCopy(),
+                                                          new AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL);
+        reader.first = reader.last = readerBounds(generation);
+        if (!keepRef)
+            reader.selfRef().release();
+        return reader;
+    }
+
+    public static ColumnFamilyStore newCFS()
+    {
+        String cfname = "mockcf" + (id.incrementAndGet());
+        CFMetaData metadata = newCFMetaData(cfname);
+        return new ColumnFamilyStore(ks, cfname, Murmur3Partitioner.instance, 0, metadata, new Directories(metadata), false, false);
+    }
+
+    private static CFMetaData newCFMetaData(String cfname)
+    {
+        CFMetaData metadata = new CFMetaData("mockks", cfname, ColumnFamilyType.Standard, new SimpleSparseCellNameType(UTF8Type.instance));
+        metadata.caching(CachingOptions.NONE);
+        return metadata;
+    }
+
+    public static BufferDecoratedKey readerBounds(int generation)
+    {
+        return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(generation), ByteBufferUtil.EMPTY_BYTE_BUFFER);
+    }
+
+    private static File temp(String id)
+    {
+        try
+        {
+            File file = File.createTempFile(id, "tmp");
+            file.deleteOnExit();
+            return file;
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 9b8e5df..c2205c4 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -24,16 +24,17 @@ import java.io.*;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.UUID;
+import java.nio.channels.FileChannel;
+import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.cache.CachingOptions;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.composites.*;
 import org.apache.cassandra.db.compaction.AbstractCompactionTask;
@@ -44,17 +45,27 @@ import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.IndexSummary;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.sstable.format.big.BigTableReader;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.AlwaysPresentFilter;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CounterId;
+import org.apache.hadoop.fs.FileUtil;
 
 import static org.junit.Assert.assertTrue;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index e5fd470..27e7a2b 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -43,7 +43,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
 
 import org.apache.cassandra.utils.concurrent.Refs;
 import static org.junit.Assert.assertEquals;
@@ -171,7 +170,7 @@ public class KeyCacheTest
 
         assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);
 
-        Set<SSTableReader> readers = cfs.getDataTracker().getSSTables();
+        Set<SSTableReader> readers = cfs.getTracker().getSSTables();
         Refs<SSTableReader> refs = Refs.tryRef(readers);
         if (refs == null)
             throw new IllegalStateException();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index a5af823..dbbce9e 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -20,57 +20,63 @@ package org.apache.cassandra.db;
  *
  */
 
-import java.io.*;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.utils.UUIDGen;
 import org.apache.commons.lang3.StringUtils;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.Scrubber;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableRewriter;
-import org.apache.cassandra.io.sstable.format.SSTableFormat;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.UUIDGen;
 
 import static org.junit.Assert.*;
 import static org.junit.Assume.assumeTrue;
 
 import static org.apache.cassandra.Util.cellname;
 import static org.apache.cassandra.Util.column;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 @RunWith(OrderedJUnit4ClassRunner.class)
 public class ScrubTest
@@ -155,7 +161,8 @@ public class ScrubTest
         overrideWithGarbage(sstable, ByteBufferUtil.bytes("0"), ByteBufferUtil.bytes("1"));
 
         // with skipCorrupted == false, the scrub is expected to fail
-        try(Scrubber scrubber = new Scrubber(cfs, sstable, false, false, true))
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(Arrays.asList(sstable), OperationType.SCRUB);
+             Scrubber scrubber = new Scrubber(cfs, txn, false, false, true);)
         {
             scrubber.scrub();
             fail("Expected a CorruptSSTableException to be thrown");
@@ -164,7 +171,8 @@ public class ScrubTest
 
         // with skipCorrupted == true, the corrupt rows will be skipped
         Scrubber.ScrubResult scrubResult;
-        try(Scrubber scrubber = new Scrubber(cfs, sstable, true, false, true))
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(Arrays.asList(sstable), OperationType.SCRUB);
+             Scrubber scrubber = new Scrubber(cfs, txn, true, false, true);)
         {
             scrubResult = scrubber.scrubWithResult();
         }
@@ -213,20 +221,24 @@ public class ScrubTest
         overrideWithGarbage(sstable, ByteBufferUtil.bytes("0"), ByteBufferUtil.bytes("1"));
 
         // with skipCorrupted == false, the scrub is expected to fail
-        Scrubber scrubber = new Scrubber(cfs, sstable, false, false, true);
-        try
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(Arrays.asList(sstable), OperationType.SCRUB);
+             Scrubber scrubber = new Scrubber(cfs, txn, false, false, true))
         {
+            // with skipCorrupted == true, the corrupt row will be skipped
             scrubber.scrub();
             fail("Expected a CorruptSSTableException to be thrown");
         }
         catch (IOError err) {}
 
-        // with skipCorrupted == true, the corrupt row will be skipped
-        scrubber = new Scrubber(cfs, sstable, true, false, true);
-        scrubber.scrub();
-        scrubber.close();
-        assertEquals(1, cfs.getSSTables().size());
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(Arrays.asList(sstable), OperationType.SCRUB);
+             Scrubber scrubber = new Scrubber(cfs, txn, true, false, true))
+        {
+            // with skipCorrupted == true, the corrupt row will be skipped
+            scrubber.scrub();
+            scrubber.close();
+        }
 
+        assertEquals(1, cfs.getSSTables().size());
         // verify that we can read all of the rows, and there is now one less row
         rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
         assertEquals(1, rows.size());
@@ -369,9 +381,13 @@ public class ScrubTest
         components.add(Component.STATS);
         components.add(Component.SUMMARY);
         components.add(Component.TOC);
+
         SSTableReader sstable = SSTableReader.openNoValidation(desc, components, cfs);
+        if (sstable.last.compareTo(sstable.first) < 0)
+            sstable.last = sstable.first;
 
-        try(Scrubber scrubber = new Scrubber(cfs, sstable, false, true, true))
+        try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.SCRUB, sstable);
+             Scrubber scrubber = new Scrubber(cfs, txn, false, true, true);)
         {
             scrubber.scrub();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 1dc72ae..235462b 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.db.compaction;
 
-import junit.framework.Assert;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.utils.concurrent.Refs;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
@@ -30,7 +30,6 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -91,13 +90,18 @@ public class AntiCompactionTest
         Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("4".getBytes()));
         List<Range<Token>> ranges = Arrays.asList(range);
 
-        Refs<SSTableReader> refs = Refs.ref(sstables);
-        long repairedAt = 1000;
-        CompactionManager.instance.performAnticompaction(store, ranges, refs, repairedAt);
-
-        assertEquals(2, store.getSSTables().size());
         int repairedKeys = 0;
         int nonRepairedKeys = 0;
+        try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+             Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            if (txn == null)
+                throw new IllegalStateException();
+            long repairedAt = 1000;
+            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt);
+        }
+
+        assertEquals(2, store.getSSTables().size());
         for (SSTableReader sstable : store.getSSTables())
         {
             try (ISSTableScanner scanner = sstable.getScanner())
@@ -123,7 +127,7 @@ public class AntiCompactionTest
             assertFalse(sstable.isMarkedCompacted());
             assertEquals(1, sstable.selfRef().globalCount());
         }
-        assertEquals(0, store.getDataTracker().getCompacting().size());
+        assertEquals(0, store.getTracker().getCompacting().size());
         assertEquals(repairedKeys, 4);
         assertEquals(nonRepairedKeys, 6);
     }
@@ -139,13 +143,16 @@ public class AntiCompactionTest
         long origSize = s.bytesOnDisk();
         Range<Token> range = new Range<Token>(new BytesToken(ByteBufferUtil.bytes(0)), new BytesToken(ByteBufferUtil.bytes(500)));
         Collection<SSTableReader> sstables = cfs.getSSTables();
-        CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), Refs.tryRef(sstables), 12345);
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+             Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), refs, txn, 12345);
+        }
         long sum = 0;
         for (SSTableReader x : cfs.getSSTables())
             sum += x.bytesOnDisk();
         assertEquals(sum, cfs.metric.liveDiskSpaceUsed.getCount());
         assertEquals(origSize, cfs.metric.liveDiskSpaceUsed.getCount(), 100000);
-
     }
 
     private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
@@ -210,10 +217,12 @@ public class AntiCompactionTest
         Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("4".getBytes()));
         List<Range<Token>> ranges = Arrays.asList(range);
 
-        Refs<SSTableReader> refs = Refs.tryRef(sstables);
-        Assert.assertNotNull(refs);
         long repairedAt = 1000;
-        CompactionManager.instance.performAnticompaction(store, ranges, refs, repairedAt);
+        try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+             Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt);
+        }
         /*
         Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time
         so there will be no net change in the number of sstables
@@ -256,12 +265,16 @@ public class AntiCompactionTest
         Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("9999".getBytes()));
         List<Range<Token>> ranges = Arrays.asList(range);
 
-        CompactionManager.instance.performAnticompaction(store, ranges, Refs.tryRef(sstables), 1);
+        try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+             Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1);
+        }
 
         assertThat(store.getSSTables().size(), is(1));
         assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(true));
         assertThat(Iterables.get(store.getSSTables(), 0).selfRef().globalCount(), is(1));
-        assertThat(store.getDataTracker().getCompacting().size(), is(0));
+        assertThat(store.getTracker().getCompacting().size(), is(0));
     }
 
 
@@ -282,8 +295,12 @@ public class AntiCompactionTest
         Range<Token> range = new Range<Token>(new BytesToken("-10".getBytes()), new BytesToken("-1".getBytes()));
         List<Range<Token>> ranges = Arrays.asList(range);
 
-        Refs<SSTableReader> refs = Refs.ref(sstables);
-        CompactionManager.instance.performAnticompaction(store, ranges, refs, 0);
+
+        try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+             Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 0);
+        }
 
         assertThat(store.getSSTables().size(), is(10));
         assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
index 88074af..235fd49 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db.compaction;
 
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.ExecutionException;
 
 import com.google.common.primitives.Longs;
 import org.junit.Before;
@@ -41,11 +40,12 @@ import org.apache.cassandra.db.compaction.writers.MajorLeveledCompactionWriter;
 import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter;
 import org.apache.cassandra.db.compaction.writers.SplittingSizeTieredCompactionWriter;
 import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.locator.SimpleStrategy;
-import org.apache.cassandra.utils.ByteBufferUtil;
+
 import static org.junit.Assert.assertEquals;
 
 public class CompactionAwareWriterTest
@@ -81,10 +81,10 @@ public class CompactionAwareWriterTest
         int rowCount = 1000;
         cfs.disableAutoCompaction();
         populate(cfs, rowCount);
-        Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
-        long beforeSize = sstables.iterator().next().onDiskLength();
-        CompactionAwareWriter writer = new DefaultCompactionWriter(cfs, sstables, sstables, false, OperationType.COMPACTION);
-        int rows = compact(cfs, sstables, writer);
+        LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getSSTables(), OperationType.COMPACTION);
+        long beforeSize = txn.originals().iterator().next().onDiskLength();
+        CompactionAwareWriter writer = new DefaultCompactionWriter(cfs, txn, txn.originals(), false, OperationType.COMPACTION);
+        int rows = compact(cfs, txn, writer);
         assertEquals(1, cfs.getSSTables().size());
         assertEquals(rowCount, rows);
         assertEquals(beforeSize, cfs.getSSTables().iterator().next().onDiskLength());
@@ -100,11 +100,11 @@ public class CompactionAwareWriterTest
         cfs.disableAutoCompaction();
         int rowCount = 1000;
         populate(cfs, rowCount);
-        Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
-        long beforeSize = sstables.iterator().next().onDiskLength();
+        LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getSSTables(), OperationType.COMPACTION);
+        long beforeSize = txn.originals().iterator().next().onDiskLength();
         int sstableSize = (int)beforeSize/10;
-        CompactionAwareWriter writer = new MaxSSTableSizeWriter(cfs, sstables, sstables, sstableSize, 0, false, OperationType.COMPACTION);
-        int rows = compact(cfs, sstables, writer);
+        CompactionAwareWriter writer = new MaxSSTableSizeWriter(cfs, txn, txn.originals(), sstableSize, 0, false, OperationType.COMPACTION);
+        int rows = compact(cfs, txn, writer);
         assertEquals(10, cfs.getSSTables().size());
         assertEquals(rowCount, rows);
         validateData(cfs, rowCount);
@@ -118,10 +118,10 @@ public class CompactionAwareWriterTest
         cfs.disableAutoCompaction();
         int rowCount = 10000;
         populate(cfs, rowCount);
-        Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
-        long beforeSize = sstables.iterator().next().onDiskLength();
-        CompactionAwareWriter writer = new SplittingSizeTieredCompactionWriter(cfs, sstables, sstables, OperationType.COMPACTION, 0);
-        int rows = compact(cfs, sstables, writer);
+        LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getSSTables(), OperationType.COMPACTION);
+        long beforeSize = txn.originals().iterator().next().onDiskLength();
+        CompactionAwareWriter writer = new SplittingSizeTieredCompactionWriter(cfs, txn, txn.originals(), OperationType.COMPACTION, 0);
+        int rows = compact(cfs, txn, writer);
         long expectedSize = beforeSize / 2;
         List<SSTableReader> sortedSSTables = new ArrayList<>(cfs.getSSTables());
 
@@ -154,11 +154,11 @@ public class CompactionAwareWriterTest
         int rowCount = 20000;
         int targetSSTableCount = 50;
         populate(cfs, rowCount);
-        Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
-        long beforeSize = sstables.iterator().next().onDiskLength();
+        LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getSSTables(), OperationType.COMPACTION);
+        long beforeSize = txn.originals().iterator().next().onDiskLength();
         int sstableSize = (int)beforeSize/targetSSTableCount;
-        CompactionAwareWriter writer = new MajorLeveledCompactionWriter(cfs, sstables, sstables, sstableSize, false, OperationType.COMPACTION);
-        int rows = compact(cfs, sstables, writer);
+        CompactionAwareWriter writer = new MajorLeveledCompactionWriter(cfs, txn, txn.originals(), sstableSize, false, OperationType.COMPACTION);
+        int rows = compact(cfs, txn, writer);
         assertEquals(targetSSTableCount, cfs.getSSTables().size());
         int [] levelCounts = new int[5];
         assertEquals(rowCount, rows);
@@ -175,13 +175,13 @@ public class CompactionAwareWriterTest
         cfs.truncateBlocking();
     }
 
-    private int compact(ColumnFamilyStore cfs, Set<SSTableReader> sstables, CompactionAwareWriter writer)
+    private int compact(ColumnFamilyStore cfs, LifecycleTransaction txn, CompactionAwareWriter writer)
     {
-        assert sstables.size() == 1;
+        assert txn.originals().size() == 1;
         int rowsWritten = 0;
-        try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables))
+        try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(txn.originals()))
         {
-            CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
+            CompactionController controller = new CompactionController(cfs, txn.originals(), cfs.gcBefore(System.currentTimeMillis()));
             ISSTableScanner scanner = scanners.scanners.get(0);
             while(scanner.hasNext())
             {
@@ -191,7 +191,6 @@ public class CompactionAwareWriterTest
             }
         }
         Collection<SSTableReader> newSSTables = writer.finish();
-        cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newSSTables, OperationType.COMPACTION);
         return rowsWritten;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
index f1d016b..64e4465 100644
--- a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
@@ -324,10 +324,10 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader
         Thread.sleep(2000);
         AbstractCompactionTask t = dtcs.getNextBackgroundTask((int) (System.currentTimeMillis()/1000));
         assertNotNull(t);
-        assertEquals(1, Iterables.size(t.sstables));
-        SSTableReader sstable = t.sstables.iterator().next();
+        assertEquals(1, Iterables.size(t.transaction.originals()));
+        SSTableReader sstable = t.transaction.originals().iterator().next();
         assertEquals(sstable, expiredSSTable);
-        cfs.getDataTracker().unmarkCompacting(cfs.getSSTables());
+        t.transaction.abort();
     }
 
 }