You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/07/24 18:32:13 UTC

[4/5] cassandra git commit: Introduce safer durable sstable membership management (and simplify cleanup of compaction leftovers)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index ba85eef..14cb795 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -28,7 +28,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -52,17 +51,17 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
     private long currentBytesToWrite;
     private int currentRatioIndex = 0;
 
-    public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, OperationType compactionType)
+    public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
     {
-        this(cfs, txn, nonExpiredSSTables, compactionType, DEFAULT_SMALLEST_SSTABLE_BYTES);
+        this(cfs, txn, nonExpiredSSTables, DEFAULT_SMALLEST_SSTABLE_BYTES);
     }
 
     @SuppressWarnings("resource")
-    public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, OperationType compactionType, long smallestSSTable)
+    public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long smallestSSTable)
     {
         super(cfs, txn, nonExpiredSSTables, false);
         this.allSSTables = txn.originals();
-        totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);
+        totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType());
         double[] potentialRatios = new double[20];
         double currentRatio = 1;
         for (int i = 0; i < potentialRatios.length; i++)
@@ -86,13 +85,14 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
         long currentPartitionsToWrite = Math.round(estimatedTotalKeys * ratios[currentRatioIndex]);
         currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]);
         @SuppressWarnings("resource")
-        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
-                                                                            currentPartitionsToWrite,
-                                                                            minRepairedAt,
-                                                                            cfs.metadata,
-                                                                            cfs.partitioner,
-                                                                            new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
-                                                                            SerializationHeader.make(cfs.metadata, nonExpiredSSTables));
+        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
+                                                    currentPartitionsToWrite,
+                                                    minRepairedAt,
+                                                    cfs.metadata,
+                                                    cfs.partitioner,
+                                                    new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
+                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+                                                    txn);
 
         sstableWriter.switchWriter(writer);
         logger.debug("Ratios={}, expectedKeys = {}, totalSize = {}, currentPartitionsToWrite = {}, currentBytesToWrite = {}", ratios, estimatedTotalKeys, totalSize, currentPartitionsToWrite, currentBytesToWrite);
@@ -109,13 +109,14 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
             long currentPartitionsToWrite = Math.round(ratios[currentRatioIndex] * estimatedTotalKeys);
             File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex])));
             @SuppressWarnings("resource")
-            SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
-                                                                                currentPartitionsToWrite,
-                                                                                minRepairedAt,
-                                                                                cfs.metadata,
-                                                                                cfs.partitioner,
-                                                                                new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
-                                                                                SerializationHeader.make(cfs.metadata, nonExpiredSSTables));
+            SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
+                                                        currentPartitionsToWrite,
+                                                        minRepairedAt,
+                                                        cfs.metadata,
+                                                        cfs.partitioner,
+                                                        new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
+                                                        SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+                                                        txn);
             sstableWriter.switchWriter(writer);
             logger.debug("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
index 536e13c..2b94d7a 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
@@ -23,6 +23,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.*;
 
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.Throwables;
 
 import static com.google.common.base.Predicates.*;
 import static com.google.common.collect.Iterables.any;
@@ -70,6 +71,16 @@ class Helpers
      * A convenience method for encapsulating this action over multiple SSTableReader with exception-safety
      * @return accumulate if not null (with any thrown exception attached), or any thrown exception otherwise
      */
+    static void setupKeyCache(Iterable<SSTableReader> readers)
+    {
+        for (SSTableReader reader : readers)
+            reader.setupKeyCache();
+    }
+
+    /**
+     * A convenience method for encapsulating this action over multiple SSTableReader with exception-safety
+     * @return accumulate if not null (with any thrown exception attached), or any thrown exception otherwise
+     */
     static Throwable setReplaced(Iterable<SSTableReader> readers, Throwable accumulate)
     {
         for (SSTableReader reader : readers)
@@ -105,18 +116,51 @@ class Helpers
             assert !reader.isReplaced();
     }
 
-    /**
-     * A convenience method for encapsulating this action over multiple SSTableReader with exception-safety
-     * @return accumulate if not null (with any thrown exception attached), or any thrown exception otherwise
-     */
-    static Throwable markObsolete(Tracker tracker, Iterable<SSTableReader> readers, Throwable accumulate)
+    static Throwable markObsolete(List<TransactionLogs.Obsoletion> obsoletions, Throwable accumulate)
+    {
+        if (obsoletions == null || obsoletions.isEmpty())
+            return accumulate;
+
+        for (TransactionLogs.Obsoletion obsoletion : obsoletions)
+        {
+            try
+            {
+                obsoletion.reader.markObsolete(obsoletion.tidier);
+            }
+            catch (Throwable t)
+            {
+                accumulate = merge(accumulate, t);
+            }
+        }
+        return accumulate;
+    }
+
+    static Throwable prepareForObsoletion(Iterable<SSTableReader> readers, TransactionLogs txnLogs, List<TransactionLogs.Obsoletion> obsoletions, Throwable accumulate)
     {
         for (SSTableReader reader : readers)
         {
             try
             {
-                boolean firstToCompact = reader.markObsolete(tracker);
-                assert firstToCompact : reader + " was already marked compacted";
+                obsoletions.add(new TransactionLogs.Obsoletion(reader, txnLogs.obsoleted(reader)));
+            }
+            catch (Throwable t)
+            {
+                accumulate = Throwables.merge(accumulate, t);
+            }
+        }
+        return accumulate;
+    }
+
+    static Throwable abortObsoletion(List<TransactionLogs.Obsoletion> obsoletions, Throwable accumulate)
+    {
+        if (obsoletions == null || obsoletions.isEmpty())
+            return accumulate;
+
+        for (TransactionLogs.Obsoletion obsoletion : obsoletions)
+        {
+            try
+            {
+                obsoletion.tidier.abort();
             }
             catch (Throwable t)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index e14e2a1..b743633 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.db.lifecycle;
 
+import java.io.File;
 import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -27,7 +28,9 @@ import com.google.common.collect.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableReader.UniqueIdentifier;
 import org.apache.cassandra.utils.concurrent.Transactional;
@@ -44,12 +47,18 @@ import static org.apache.cassandra.utils.Throwables.maybeFail;
 import static org.apache.cassandra.utils.concurrent.Refs.release;
 import static org.apache.cassandra.utils.concurrent.Refs.selfRefs;
 
+/**
+ * IMPORTANT: When this object is involved in a transactional graph, for correct behaviour its commit MUST occur before
+ * any others, since it may legitimately fail. This is consistent with the Transactional API, which permits one failing
+ * action to occur at the beginning of the commit phase, but also *requires* that the prepareToCommit() phase only take
+ * actions that can be rolled back.
+ */
 public class LifecycleTransaction extends Transactional.AbstractTransactional
 {
     private static final Logger logger = LoggerFactory.getLogger(LifecycleTransaction.class);
 
     /**
-     * a class that represents accumulated modifications to the Tracker.
+     * A class that represents accumulated modifications to the Tracker.
      * has two instances, one containing modifications that are "staged" (i.e. invisible)
      * and one containing those "logged" that have been made visible through a call to checkpoint()
      */
@@ -86,7 +95,8 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
     }
 
     public final Tracker tracker;
-    private final OperationType operationType;
+    // The transaction logs keep track of new and old sstable files
+    private final TransactionLogs transactionLogs;
     // the original readers this transaction was opened over, and that it guards
     // (no other transactions may operate over these readers concurrently)
     private final Set<SSTableReader> originals = new HashSet<>();
@@ -95,13 +105,16 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
     // the identity set of readers we've ever encountered; used to ensure we don't accidentally revisit the
     // same version of a reader. potentially a dangerous property if there are reference counting bugs
     // as they won't be caught until the transaction's lifespan is over.
-    private final Set<UniqueIdentifier> identities = Collections.newSetFromMap(new IdentityHashMap<UniqueIdentifier, Boolean>());
+    private final Set<UniqueIdentifier> identities = Collections.newSetFromMap(new IdentityHashMap<>());
 
     // changes that have been made visible
     private final State logged = new State();
     // changes that are pending
     private final State staged = new State();
 
+    // the tidier and their readers, to be used for marking readers obsoleted during a commit
+    private List<TransactionLogs.Obsoletion> obsoletions;
+
     /**
      * construct a Transaction for use in an offline operation
      */
@@ -122,10 +135,33 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
         return new LifecycleTransaction(dummy, operationType, readers);
     }
 
+    /**
+     * construct an empty Transaction with no existing readers
+     */
+    public static LifecycleTransaction offline(OperationType operationType, CFMetaData metadata)
+    {
+        Tracker dummy = new Tracker(null, false);
+        return new LifecycleTransaction(dummy, new TransactionLogs(operationType, metadata, dummy), Collections.emptyList());
+    }
+
+    /**
+     * construct an empty Transaction with no existing readers
+     */
+    public static LifecycleTransaction offline(OperationType operationType, File operationFolder)
+    {
+        Tracker dummy = new Tracker(null, false);
+        return new LifecycleTransaction(dummy, new TransactionLogs(operationType, operationFolder, dummy), Collections.emptyList());
+    }
+
     LifecycleTransaction(Tracker tracker, OperationType operationType, Iterable<SSTableReader> readers)
     {
+        this(tracker, new TransactionLogs(operationType, getMetadata(tracker, readers), tracker), readers);
+    }
+
+    LifecycleTransaction(Tracker tracker, TransactionLogs transactionLogs, Iterable<SSTableReader> readers)
+    {
         this.tracker = tracker;
-        this.operationType = operationType;
+        this.transactionLogs = transactionLogs;
         for (SSTableReader reader : readers)
         {
             originals.add(reader);
@@ -134,6 +170,36 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
         }
     }
 
+    private static CFMetaData getMetadata(Tracker tracker, Iterable<SSTableReader> readers)
+    {
+        if (tracker.cfstore != null)
+            return tracker.cfstore.metadata;
+
+        for (SSTableReader reader : readers)
+        {
+            if (reader.metadata != null)
+                return reader.metadata;
+        }
+
+        assert false : "Expected cfstore or at least one reader with metadata";
+        return null;
+    }
+
+    public TransactionLogs logs()
+    {
+        return transactionLogs;
+    }
+
+    public OperationType opType()
+    {
+        return transactionLogs.getType();
+    }
+
+    public UUID opId()
+    {
+        return transactionLogs.getId();
+    }
+
     public void doPrepare()
     {
         // note for future: in anticompaction two different operations use the same Transaction, and both prepareToCommit()
@@ -141,6 +207,11 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
         // (and these happen anyway) this is fine but if more logic gets inserted here than is performed in a checkpoint,
         // it may break this use case, and care is needed
         checkpoint();
+
+        // prepare for compaction obsolete readers as long as they were part of the original set
+        // since those that are not original are early readers that share the same desc with the finals
+        maybeFail(prepareForObsoletion(filterIn(logged.obsolete, originals), transactionLogs, obsoletions = new ArrayList<>(), null));
+        transactionLogs.prepareToCommit();
     }
 
     /**
@@ -149,16 +220,23 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
     public Throwable doCommit(Throwable accumulate)
     {
         assert staged.isEmpty() : "must be no actions introduced between prepareToCommit and a commit";
-
         logger.debug("Committing update:{}, obsolete:{}", staged.update, staged.obsolete);
 
+        // accumulate must be null if we have been used correctly, so fail immediately if it is not
+        maybeFail(accumulate);
+
+        // transaction log commit failure means we must abort; safe commit is not possible
+        maybeFail(transactionLogs.commit(null));
+
         // this is now the point of no return; we cannot safely rollback, so we ignore exceptions until we're done
         // we restore state by obsoleting our obsolete files, releasing our references to them, and updating our size
         // and notification status for the obsolete and new files
-        accumulate = markObsolete(tracker, logged.obsolete, accumulate);
+
+        accumulate = markObsolete(obsoletions, accumulate);
         accumulate = tracker.updateSizeTracking(logged.obsolete, logged.update, accumulate);
         accumulate = release(selfRefs(logged.obsolete), accumulate);
-        accumulate = tracker.notifySSTablesChanged(originals, logged.update, operationType, accumulate);
+        accumulate = tracker.notifySSTablesChanged(originals, logged.update, transactionLogs.getType(), accumulate);
+
         return accumulate;
     }
 
@@ -170,15 +248,20 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
         if (logger.isDebugEnabled())
             logger.debug("Aborting transaction over {}, with ({},{}) logged and ({},{}) staged", originals, logged.update, logged.obsolete, staged.update, staged.obsolete);
 
+        accumulate = abortObsoletion(obsoletions, accumulate);
+
         if (logged.isEmpty() && staged.isEmpty())
-            return accumulate;
+            return transactionLogs.abort(accumulate);
 
         // mark obsolete all readers that are not versions of those present in the original set
         Iterable<SSTableReader> obsolete = filterOut(concatUniq(staged.update, logged.update), originals);
         logger.debug("Obsoleting {}", obsolete);
-        // we don't pass the tracker in for the obsoletion, since these readers have never been notified externally
-        // nor had their size accounting affected
-        accumulate = markObsolete(null, obsolete, accumulate);
+
+        accumulate = prepareForObsoletion(obsolete, transactionLogs, obsoletions = new ArrayList<>(), accumulate);
+        // it's safe to abort even if committed, see maybeFail in doCommit() above, in this case it will just report
+        // a failure to abort, which is useful information to have for debug
+        accumulate = transactionLogs.abort(accumulate);
+        accumulate = markObsolete(obsoletions, accumulate);
 
         // replace all updated readers with a version restored to its original state
         accumulate = tracker.apply(updateLiveSet(logged.update, restoreUpdatedOriginals()), accumulate);
@@ -189,6 +272,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
         // any _staged_ obsoletes should either be in staged.update already, and dealt with there,
         // or is still in its original form (so left as is); in either case no extra action is needed
         accumulate = release(selfRefs(concat(staged.update, logged.update, logged.obsolete)), accumulate);
+
         logged.clear();
         staged.clear();
         return accumulate;
@@ -270,8 +354,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
     }
 
     /**
-     * mark this reader as for obsoletion. this does not actually obsolete the reader until commit() is called,
-     * but on checkpoint() the reader will be removed from the live set
+     * mark this reader as for obsoletion : on checkpoint() the reader will be removed from the live set
      */
     public void obsolete(SSTableReader reader)
     {
@@ -312,8 +395,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
      */
     private Iterable<SSTableReader> fresh()
     {
-        return filterOut(staged.update,
-                         originals, logged.update);
+        return filterOut(staged.update, originals, logged.update);
     }
 
     /**
@@ -332,14 +414,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
     private List<SSTableReader> restoreUpdatedOriginals()
     {
         Iterable<SSTableReader> torestore = filterIn(originals, logged.update, logged.obsolete);
-        return ImmutableList.copyOf(transform(torestore,
-                                              new Function<SSTableReader, SSTableReader>()
-                                              {
-                                                  public SSTableReader apply(SSTableReader reader)
-                                                  {
-                                                      return current(reader).cloneWithNewStart(reader.first, null);
-                                                  }
-                                              }));
+        return ImmutableList.copyOf(transform(torestore, (reader) -> current(reader).cloneWithNewStart(reader.first, null)));
     }
 
     /**
@@ -415,7 +490,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
             originals.remove(reader);
             marked.remove(reader);
         }
-        return new LifecycleTransaction(tracker, operationType, readers);
+        return new LifecycleTransaction(tracker, transactionLogs.getType(), readers);
     }
 
     /**
@@ -446,6 +521,31 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
         return getFirst(originals, null);
     }
 
+    public void trackNew(SSTable table)
+    {
+        transactionLogs.trackNew(table);
+    }
+
+    public void untrackNew(SSTable table)
+    {
+        transactionLogs.untrackNew(table);
+    }
+
+    public static void removeUnfinishedLeftovers(CFMetaData metadata)
+    {
+        TransactionLogs.removeUnfinishedLeftovers(metadata);
+    }
+
+    public static Set<File> getTemporaryFiles(CFMetaData metadata, File folder)
+    {
+        return TransactionLogs.getTemporaryFiles(metadata, folder);
+    }
+
+    public static Set<File> getLogFiles(CFMetaData metadata)
+    {
+        return TransactionLogs.getLogFiles(metadata);
+    }
+
     // a class representing the current state of the reader within this transaction, encoding the actions both logged
     // and pending, and the reader instances that are visible now, and will be after the next checkpoint (with null
     // indicating either obsolescence, or that the reader does not occur in the transaction; which is defined
@@ -453,7 +553,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
     @VisibleForTesting
     public static class ReaderState
     {
-        public static enum Action
+        public enum Action
         {
             UPDATED, OBSOLETED, NONE;
             public static Action get(boolean updated, boolean obsoleted)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index f1c4685..241eb4b 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.notifications.*;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
 import static com.google.common.base.Predicates.and;
@@ -162,9 +163,11 @@ public class Tracker
                 accumulate = merge(accumulate, t);
             }
         }
+
         StorageMetrics.load.inc(add - subtract);
         cfstore.metric.liveDiskSpaceUsed.inc(add - subtract);
-        // we don't subtract from total until the sstable is deleted
+
+        // we don't subtract from total until the sstable is deleted, see TransactionLogs.SSTableTidier
         cfstore.metric.totalDiskSpaceUsed.inc(add);
         return accumulate;
     }
@@ -224,29 +227,47 @@ public class Tracker
      */
     public Throwable dropSSTables(final Predicate<SSTableReader> remove, OperationType operationType, Throwable accumulate)
     {
-        Pair<View, View> result = apply(new Function<View, View>()
+        try (TransactionLogs txnLogs = new TransactionLogs(operationType, cfstore.metadata, this))
         {
-            public View apply(View view)
-            {
+            Pair<View, View> result = apply(view -> {
                 Set<SSTableReader> toremove = copyOf(filter(view.sstables, and(remove, notIn(view.compacting))));
                 return updateLiveSet(toremove, emptySet()).apply(view);
-            }
-        });
+            });
 
-        Set<SSTableReader> removed = Sets.difference(result.left.sstables, result.right.sstables);
-        assert Iterables.all(removed, remove);
+            Set<SSTableReader> removed = Sets.difference(result.left.sstables, result.right.sstables);
+            assert Iterables.all(removed, remove);
 
-        if (!removed.isEmpty())
+            // It is important that any method accepting/returning a Throwable never throws an exception, and does its best
+            // to complete the instructions given to it
+            List<TransactionLogs.Obsoletion> obsoletions = new ArrayList<>();
+            accumulate = prepareForObsoletion(removed, txnLogs, obsoletions, accumulate);
+            try
+            {
+                txnLogs.finish();
+                if (!removed.isEmpty())
+                {
+                    accumulate = markObsolete(obsoletions, accumulate);
+                    accumulate = updateSizeTracking(removed, emptySet(), accumulate);
+                    accumulate = release(selfRefs(removed), accumulate);
+                    // notifySSTablesChanged -> LeveledManifest.promote doesn't like a no-op "promotion"
+                    accumulate = notifySSTablesChanged(removed, Collections.<SSTableReader>emptySet(), txnLogs.getType(), accumulate);
+                }
+            }
+            catch (Throwable t)
+            {
+                accumulate = abortObsoletion(obsoletions, accumulate);
+                accumulate = Throwables.merge(accumulate, t);
+            }
+        }
+        catch (Throwable t)
         {
-            // notifySSTablesChanged -> LeveledManifest.promote doesn't like a no-op "promotion"
-            accumulate = notifySSTablesChanged(removed, Collections.<SSTableReader>emptySet(), operationType, accumulate);
-            accumulate = updateSizeTracking(removed, emptySet(), accumulate);
-            accumulate = markObsolete(this, removed, accumulate);
-            accumulate = release(selfRefs(removed), accumulate);
+            accumulate = Throwables.merge(accumulate, t);
         }
+
         return accumulate;
     }
 
+
     /**
      * Removes every SSTable in the directory from the Tracker's view.
      * @param directory the unreadable directory, possibly with SSTables in it, but not necessarily.
@@ -370,7 +391,6 @@ public class Tracker
         sstable.createLinks(FileUtils.getCanonicalPath(backupsDir));
     }
 
-
     // NOTIFICATION
 
     Throwable notifySSTablesChanged(Collection<SSTableReader> removed, Collection<SSTableReader> added, OperationType compactionType, Throwable accumulate)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/lifecycle/TransactionLogs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/TransactionLogs.java b/src/java/org/apache/cassandra/db/lifecycle/TransactionLogs.java
new file mode 100644
index 0000000..ab6c72a
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/lifecycle/TransactionLogs.java
@@ -0,0 +1,786 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.lifecycle;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.cassandra.utils.Throwables.merge;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.CLibrary;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.concurrent.Blocker;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.concurrent.RefCounted;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+/**
+ * IMPORTANT: When this object is involved in a transactional graph, and is not encapsulated in a LifecycleTransaction,
+ * for correct behaviour its commit MUST occur before any others, since it may legitimately fail. This is consistent
+ * with the Transactional API, which permits one failing action to occur at the beginning of the commit phase, but also
+ * *requires* that the prepareToCommit() phase only take actions that can be rolled back.
+ *
+ * A class that tracks sstable files involved in a transaction across sstables:
+ * if the transaction succeeds the old files should be deleted and the new ones kept; vice-versa if it fails.
+ *
+ * Two log files, NEW and OLD, contain new and old sstable files respectively. The log files also track each
+ * other by referencing each others path in the contents.
+ *
+ * If the transaction finishes successfully:
+ * - the OLD transaction file is deleted along with its contents, this includes the NEW transaction file.
+ *   Before deleting we must let the SSTableTidier instances run first for any old readers that are being obsoleted
+ *   (mark as compacted) by the transaction, see LifecycleTransaction
+ *
+ * If the transaction is aborted:
+ * - the NEW transaction file and its contents are deleted, this includes the OLD transaction file
+ *
+ * On start-up:
+ * - If we find a NEW transaction file, it means the transaction did not complete and we delete the NEW file and its contents
+ * - If we find an OLD transaction file but not a NEW file, it means the transaction must have completed and so we delete
+ *   all the contents of the OLD file, if they still exist, and the OLD file itself.
+ *
+ * See CASSANDRA-7066 for full details.
+ */
+public class TransactionLogs extends Transactional.AbstractTransactional implements Transactional
+{
+    private static final Logger logger = LoggerFactory.getLogger(TransactionLogs.class);
+
+    /**
+     * A single transaction log file, either NEW or OLD.
+     */
+    final static class TransactionFile
+    {
+        static String EXT = ".log";
+        static char SEP = '_';
+        static String REGEX_STR = String.format("^(.*)_(.*)_(%s|%s)%s$", Type.NEW.txt, Type.OLD.txt, EXT);
+        static Pattern REGEX = Pattern.compile(REGEX_STR); //(opname)_(id)_(new|old).data
+
+        public enum Type
+        {
+            NEW (0, "new"),
+            OLD (1, "old");
+
+            public final int idx;
+            public final String txt;
+
+            Type(int idx, String txt)
+            {
+                this.idx = idx;
+                this.txt = txt;
+            }
+        };
+
+        public final Type type;
+        public final File file;
+        public final TransactionData parent;
+        public final Set<String> lines = new HashSet<>();
+
+        public TransactionFile(Type type, TransactionData parent)
+        {
+            this.type = type;
+            this.file = new File(parent.getFileName(type));
+            this.parent = parent;
+
+            if (exists())
+                lines.addAll(FileUtils.readLines(file));
+        }
+
+        public boolean add(SSTable table)
+        {
+            return add(table.descriptor.baseFilename());
+        }
+
+        private boolean add(String path)
+        {
+            String relativePath = FileUtils.getRelativePath(parent.getParentFolder(), path);
+            if (lines.contains(relativePath))
+                return false;
+
+            lines.add(relativePath);
+            FileUtils.append(file, relativePath);
+            return true;
+        }
+
+        public void remove(SSTable table)
+        {
+            String relativePath = FileUtils.getRelativePath(parent.getParentFolder(), table.descriptor.baseFilename());
+            assert lines.contains(relativePath) : String.format("%s is not tracked by %s", relativePath, file);
+
+            lines.remove(relativePath);
+            delete(relativePath);
+        }
+
+        public boolean contains(SSTable table)
+        {
+            String relativePath = FileUtils.getRelativePath(parent.getParentFolder(), table.descriptor.baseFilename());
+            return lines.contains(relativePath);
+        }
+
+        private void deleteContents()
+        {
+            deleteOpposite();
+
+            // we sync the parent file descriptor between opposite log deletion and
+            // contents deletion to ensure there is a happens before edge between them
+            parent.sync();
+
+            lines.forEach(line -> delete(line));
+            lines.clear();
+        }
+
+        private void deleteOpposite()
+        {
+            Type oppositeType = type == Type.NEW ? Type.OLD : Type.NEW;
+            String oppositeFile = FileUtils.getRelativePath(parent.getParentFolder(), parent.getFileName(oppositeType));
+            assert lines.contains(oppositeFile) : String.format("Could not find %s amongst lines", oppositeFile);
+
+            delete(oppositeFile);
+            lines.remove(oppositeFile);
+        }
+
+        private void delete(String relativePath)
+        {
+            getTrackedFiles(relativePath).forEach(file -> TransactionLogs.delete(file));
+        }
+
+        public Set<File> getTrackedFiles()
+        {
+            Set<File> ret = new HashSet<>();
+            FileUtils.readLines(file).forEach(line -> ret.addAll(getTrackedFiles(line)));
+            ret.add(file);
+            return ret;
+        }
+
+        private List<File> getTrackedFiles(String relativePath)
+        {
+            List<File> ret = new ArrayList<>();
+            File file = new File(StringUtils.join(parent.getParentFolder(), File.separator, relativePath));
+            if (file.exists())
+                ret.add(file);
+            else
+                ret.addAll(Arrays.asList(new File(parent.getParentFolder()).listFiles((dir, name) -> {
+                    return name.startsWith(relativePath);
+                })));
+
+            return ret;
+        }
+
+        public void delete(boolean deleteContents)
+        {
+            assert file.exists() : String.format("Expected %s to exists", file);
+
+            if (deleteContents)
+                deleteContents();
+
+            // we sync the parent file descriptor between contents and log deletion
+            // to ensure there is a happens before edge between them
+            parent.sync();
+
+            TransactionLogs.delete(file);
+        }
+
+        public boolean exists()
+        {
+            return file.exists();
+        }
+    }
+
+    /**
+     * We split the transaction data from the behavior because we need
+     * to reconstruct any left-overs and clean them up, as well as work
+     * out which files are temporary. So for these cases we don't want the full
+     * transactional behavior, plus it's handy for the TransactionTidier.
+     */
+    final static class TransactionData implements AutoCloseable
+    {
+        private final OperationType opType;
+        private final UUID id;
+        private final File folder;
+        private final TransactionFile[] files;
+        private int folderDescriptor;
+        private boolean succeeded;
+
+        static TransactionData make(File logFile)
+        {
+            Matcher matcher = TransactionFile.REGEX.matcher(logFile.getName());
+            assert matcher.matches();
+
+            OperationType operationType = OperationType.fromFileName(matcher.group(1));
+            UUID id = UUID.fromString(matcher.group(2));
+
+            return new TransactionData(operationType, logFile.getParentFile(), id);
+        }
+
+        TransactionData(OperationType opType, File folder, UUID id)
+        {
+            this.opType = opType;
+            this.id = id;
+            this.folder = folder;
+            this.files = new TransactionFile[TransactionFile.Type.values().length];
+            for (TransactionFile.Type t : TransactionFile.Type.values())
+                this.files[t.idx] = new TransactionFile(t, this);
+
+            this.folderDescriptor = CLibrary.tryOpenDirectory(folder.getPath());
+            this.succeeded = !newLog().exists() && oldLog().exists();
+        }
+
+        public void succeeded(boolean succeeded)
+        {
+            this.succeeded = succeeded;
+        }
+
+        public void close()
+        {
+            if (folderDescriptor > 0)
+            {
+                CLibrary.tryCloseFD(folderDescriptor);
+                folderDescriptor = -1;
+            }
+        }
+
+        void crossReference()
+        {
+            newLog().add(oldLog().file.getPath());
+            oldLog().add(newLog().file.getPath());
+        }
+
+        void sync()
+        {
+            if (folderDescriptor > 0)
+                CLibrary.trySync(folderDescriptor);
+        }
+
+        TransactionFile newLog()
+        {
+            return files[TransactionFile.Type.NEW.idx];
+        }
+
+        TransactionFile oldLog()
+        {
+            return files[TransactionFile.Type.OLD.idx];
+        }
+
+        OperationType getType()
+        {
+            return opType;
+        }
+
+        UUID getId()
+        {
+            return id;
+        }
+
+        Throwable removeUnfinishedLeftovers(Throwable accumulate)
+        {
+            try
+            {
+                if (succeeded)
+                    oldLog().delete(true);
+                else
+                    newLog().delete(true);
+            }
+            catch (Throwable t)
+            {
+                accumulate = merge(accumulate, t);
+            }
+
+            return accumulate;
+        }
+
+        Set<File> getTemporaryFiles()
+        {
+            sync();
+
+            if (newLog().exists())
+                return newLog().getTrackedFiles();
+            else
+                return oldLog().getTrackedFiles();
+        }
+
+        String getFileName(TransactionFile.Type type)
+        {
+            String fileName = StringUtils.join(opType.fileName,
+                                               TransactionFile.SEP,
+                                               id.toString(),
+                                               TransactionFile.SEP,
+                                               type.txt,
+                                               TransactionFile.EXT);
+            return StringUtils.join(folder, File.separator, fileName);
+        }
+
+        String getParentFolder()
+        {
+            return folder.getParent();
+        }
+
+        static boolean isLogFile(String name)
+        {
+            return TransactionFile.REGEX.matcher(name).matches();
+        }
+    }
+
+    private final Tracker tracker;
+    private final TransactionData data;
+    private final Ref<TransactionLogs> selfRef;
+    // Deleting sstables is tricky because the mmapping might not have been finalized yet,
+    // and delete will fail (on Windows) until it is (we only force the unmapping on SUN VMs).
+    // Additionally, we need to make sure to delete the data file first, so on restart the others
+    // will be recognized as GCable.
+    private static final Queue<Runnable> failedDeletions = new ConcurrentLinkedQueue<>();
+    private static final Blocker blocker = new Blocker();
+
+    TransactionLogs(OperationType opType, CFMetaData metadata)
+    {
+        this(opType, metadata, null);
+    }
+
+    TransactionLogs(OperationType opType, CFMetaData metadata, Tracker tracker)
+    {
+        this(opType, new Directories(metadata), tracker);
+    }
+
+    TransactionLogs(OperationType opType, Directories directories, Tracker tracker)
+    {
+        this(opType, directories.getDirectoryForNewSSTables(), tracker);
+    }
+
+    TransactionLogs(OperationType opType, File folder, Tracker tracker)
+    {
+        this.tracker = tracker;
+        this.data = new TransactionData(opType,
+                                        Directories.getTransactionsDirectory(folder),
+                                        UUIDGen.getTimeUUID());
+        this.selfRef = new Ref<>(this, new TransactionTidier(data));
+
+        data.crossReference();
+        if (logger.isDebugEnabled())
+            logger.debug("Created transaction logs with id {}", data.id);
+    }
+
+    /**
+     * Track a reader as new.
+     **/
+    void trackNew(SSTable table)
+    {
+        if (!data.newLog().add(table))
+            throw new IllegalStateException(table + " is already tracked as new");
+
+        data.newLog().add(table);
+    }
+
+    /**
+     * Stop tracking a reader as new.
+     */
+    void untrackNew(SSTable table)
+    {
+        data.newLog().remove(table);
+    }
+
+    /**
+     * Schedule a reader for deletion as soon as it is fully unreferenced and the transaction
+     * has been committed.
+     */
+    SSTableTidier obsoleted(SSTableReader reader)
+    {
+        if (data.newLog().contains(reader))
+        {
+            if (data.oldLog().contains(reader))
+                throw new IllegalArgumentException();
+
+            return new SSTableTidier(reader, true, this);
+        }
+
+        if (!data.oldLog().add(reader))
+            throw new IllegalStateException();
+
+        if (tracker != null)
+            tracker.notifyDeleting(reader);
+
+        return new SSTableTidier(reader, false, this);
+    }
+
+    OperationType getType()
+    {
+        return data.getType();
+    }
+
+    UUID getId()
+    {
+        return data.getId();
+    }
+
+    @VisibleForTesting
+    String getDataFolder()
+    {
+        return data.getParentFolder();
+    }
+
+    @VisibleForTesting
+    String getLogsFolder()
+    {
+        return StringUtils.join(getDataFolder(), File.separator, Directories.TRANSACTIONS_SUBDIR);
+    }
+
+    @VisibleForTesting
+    TransactionData getData()
+    {
+        return data;
+    }
+
+    private static void delete(File file)
+    {
+        try
+        {
+            if (logger.isDebugEnabled())
+                logger.debug("Deleting {}", file);
+
+            Files.delete(file.toPath());
+        }
+        catch (NoSuchFileException e)
+        {
+            logger.warn("Unable to delete {} as it does not exist", file);
+        }
+        catch (IOException e)
+        {
+            logger.error("Unable to delete {}", file, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * The transaction tidier.
+     *
+     * When the transaction reference is fully released we try to delete all the obsolete files
+     * depending on the transaction result.
+     */
+    private static class TransactionTidier implements RefCounted.Tidy, Runnable
+    {
+        private final TransactionData data;
+
+        public TransactionTidier(TransactionData data)
+        {
+            this.data = data;
+        }
+
+        public void tidy() throws Exception
+        {
+            run();
+        }
+
+        public String name()
+        {
+            return data.id.toString();
+        }
+
+        public void run()
+        {
+            if (logger.isDebugEnabled())
+                logger.debug("Removing files for transaction {}", name());
+
+            Throwable err = data.removeUnfinishedLeftovers(null);
+
+            if (err != null)
+            {
+                logger.info("Failed deleting files for transaction {}, we'll retry after GC and on on server restart", name(), err);
+                failedDeletions.add(this);
+            }
+            else
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("Closing file transaction {}", name());
+                data.close();
+            }
+        }
+    }
+
+    static class Obsoletion
+    {
+        final SSTableReader reader;
+        final SSTableTidier tidier;
+
+        public Obsoletion(SSTableReader reader, SSTableTidier tidier)
+        {
+            this.reader = reader;
+            this.tidier = tidier;
+        }
+    }
+
+    /**
+     * The SSTableReader tidier. When a reader is fully released and no longer referenced
+     * by any one, we run this. It keeps a reference to the parent transaction and releases
+     * it when done, so that the final transaction cleanup can run when all obsolete readers
+     * are released.
+     */
+    public static class SSTableTidier implements Runnable
+    {
+        // must not retain a reference to the SSTableReader, else leak detection cannot kick in
+        private final Descriptor desc;
+        private final long sizeOnDisk;
+        private final Tracker tracker;
+        private final boolean wasNew;
+        private final Ref<TransactionLogs> parentRef;
+
+        public SSTableTidier(SSTableReader referent, boolean wasNew, TransactionLogs parent)
+        {
+            this.desc = referent.descriptor;
+            this.sizeOnDisk = referent.bytesOnDisk();
+            this.tracker = parent.tracker;
+            this.wasNew = wasNew;
+            this.parentRef = parent.selfRef.tryRef();
+        }
+
+        public void run()
+        {
+            blocker.ask();
+
+            SystemKeyspace.clearSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
+
+            try
+            {
+                // If we can't successfully delete the DATA component, set the task to be retried later: see TransactionTidier
+                File datafile = new File(desc.filenameFor(Component.DATA));
+
+                delete(datafile);
+                // let the remainder be cleaned up by delete
+                SSTable.delete(desc, SSTable.discoverComponentsFor(desc));
+            }
+            catch (Throwable t)
+            {
+                logger.error("Failed deletion for {}, we'll retry after GC and on server restart", desc);
+                failedDeletions.add(this);
+                return;
+            }
+
+            if (tracker != null && !wasNew)
+                tracker.cfstore.metric.totalDiskSpaceUsed.dec(sizeOnDisk);
+
+            // release the referent to the parent so that the all transaction files can be released
+            parentRef.release();
+        }
+
+        public void abort()
+        {
+            parentRef.release();
+        }
+    }
+
+    /**
+     * Retry all deletions that failed the first time around (presumably b/c the sstable was still mmap'd.)
+     * Useful because there are times when we know GC has been invoked; also exposed as an mbean.
+     */
+    public static void rescheduleFailedDeletions()
+    {
+        Runnable task;
+        while ( null != (task = failedDeletions.poll()))
+            ScheduledExecutors.nonPeriodicTasks.submit(task);
+    }
+
+    /**
+     * Deletions run on the nonPeriodicTasks executor, (both failedDeletions or global tidiers in SSTableReader)
+     * so by scheduling a new empty task and waiting for it we ensure any prior deletion has completed.
+     */
+    public static void waitForDeletions()
+    {
+        FBUtilities.waitOnFuture(ScheduledExecutors.nonPeriodicTasks.schedule(() -> {
+        }, 0, TimeUnit.MILLISECONDS));
+    }
+
+    @VisibleForTesting
+    public static void pauseDeletions(boolean stop)
+    {
+        blocker.block(stop);
+    }
+
+    private Throwable complete(Throwable accumulate)
+    {
+        try
+        {
+            try
+            {
+                if (data.succeeded)
+                    data.newLog().delete(false);
+                else
+                    data.oldLog().delete(false);
+            }
+            catch (Throwable t)
+            {
+                accumulate = merge(accumulate, t);
+            }
+
+            accumulate = selfRef.ensureReleased(accumulate);
+            return accumulate;
+        }
+        catch (Throwable t)
+        {
+            logger.error("Failed to complete file transaction {}", getId(), t);
+            return Throwables.merge(accumulate, t);
+        }
+    }
+
+    protected Throwable doCommit(Throwable accumulate)
+    {
+        data.succeeded(true);
+        return complete(accumulate);
+    }
+
+    protected Throwable doAbort(Throwable accumulate)
+    {
+        data.succeeded(false);
+        return complete(accumulate);
+    }
+
+    protected void doPrepare() { }
+
+    /**
+     * Called on startup to scan existing folders for any unfinished leftovers of
+     * operations that were ongoing when the process exited.
+     *
+     * We check if the new transaction file exists first, and if so we clean it up
+     * along with its contents, which includes the old file, else if only the old file exists
+     * it means the operation has completed and we only cleanup the old file with its contents.
+     */
+    static void removeUnfinishedLeftovers(CFMetaData metadata)
+    {
+        Throwable accumulate = null;
+        Set<UUID> ids = new HashSet<>();
+
+        for (File dir : getFolders(metadata, null))
+        {
+            File[] logs = dir.listFiles((dir1, name) -> {
+                return TransactionData.isLogFile(name);
+            });
+
+            for (File log : logs)
+            {
+                try (TransactionData data = TransactionData.make(log))
+                {
+                    // we need to check this because there are potentially 2 log files per operation
+                    if (ids.contains(data.id))
+                        continue;
+
+                    ids.add(data.id);
+                    accumulate = data.removeUnfinishedLeftovers(accumulate);
+                }
+            }
+        }
+
+        if (accumulate != null)
+            logger.error("Failed to remove unfinished transaction leftovers", accumulate);
+    }
+
+    /**
+     * Return a set of files that are temporary, that is they are involved with
+     * a transaction that hasn't completed yet.
+     *
+     * Only return the files that exist and that are located in the folder
+     * specified as a parameter or its sub-folders.
+     */
+    static Set<File> getTemporaryFiles(CFMetaData metadata, File folder)
+    {
+        Set<File> ret = new HashSet<>();
+        Set<UUID> ids = new HashSet<>();
+
+        for (File dir : getFolders(metadata, folder))
+        {
+            File[] logs = dir.listFiles((dir1, name) -> {
+                return TransactionData.isLogFile(name);
+            });
+
+            for (File log : logs)
+            {
+                try(TransactionData data = TransactionData.make(log))
+                {
+                    // we need to check this because there are potentially 2 log files per transaction
+                    if (ids.contains(data.id))
+                        continue;
+
+                    ids.add(data.id);
+                    ret.addAll(data.getTemporaryFiles()
+                                   .stream()
+                                   .filter(file -> FileUtils.isContained(folder, file))
+                                   .collect(Collectors.toSet()));
+                }
+            }
+        }
+
+        return ret;
+    }
+
+    /**
+     * Return the transaction log files that currently exist for this table.
+     */
+    static Set<File> getLogFiles(CFMetaData metadata)
+    {
+        Set<File> ret = new HashSet<>();
+        for (File dir : getFolders(metadata, null))
+            ret.addAll(Arrays.asList(dir.listFiles((dir1, name) -> {
+                return TransactionData.isLogFile(name);
+            })));
+
+        return ret;
+    }
+
+    /**
+     * A utility method to work out the existing transaction sub-folders
+     * either for a table, or a specific parent folder, or both.
+     */
+    private static List<File> getFolders(CFMetaData metadata, File folder)
+    {
+        List<File> ret = new ArrayList<>();
+        if (metadata != null)
+        {
+            Directories directories = new Directories(metadata);
+            ret.addAll(directories.getExistingDirectories(Directories.TRANSACTIONS_SUBDIR));
+        }
+
+        if (folder != null)
+        {
+            File opDir = Directories.getExistingDirectory(folder, Directories.TRANSACTIONS_SUBDIR);
+            if (opDir != null)
+                ret.add(opDir);
+        }
+
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 174e634..acc9141 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.Pair;
 
@@ -60,18 +59,19 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
         this.formatType = type;
     }
 
-    protected SSTableWriter createWriter()
+    protected SSTableTxnWriter createWriter()
     {
-        return SSTableWriter.create(createDescriptor(directory, metadata.ksName, metadata.cfName, formatType),
+        return SSTableTxnWriter.create(createDescriptor(directory, metadata.ksName, metadata.cfName, formatType),
                                     0,
                                     ActiveRepairService.UNREPAIRED_SSTABLE,
+                                    0,
                                     new SerializationHeader(metadata, columns, EncodingStats.NO_STATS));
     }
 
     private static Descriptor createDescriptor(File directory, final String keyspace, final String columnFamily, final SSTableFormat.Type fmt)
     {
         int maxGen = getNextGeneration(directory, columnFamily);
-        return new Descriptor(directory, keyspace, columnFamily, maxGen + 1, Descriptor.Type.TEMP, fmt);
+        return new Descriptor(directory, keyspace, columnFamily, maxGen + 1, fmt);
     }
 
     private static int getNextGeneration(File directory, final String columnFamily)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index 5ab99e7..519f14e 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -18,9 +18,7 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.File;
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.StringTokenizer;
+import java.util.*;
 
 import com.google.common.base.CharMatcher;
 import com.google.common.base.Objects;
@@ -46,47 +44,35 @@ import static org.apache.cassandra.io.sstable.Component.separator;
  */
 public class Descriptor
 {
-    public static enum Type
-    {
-        TEMP("tmp", true), TEMPLINK("tmplink", true), FINAL(null, false);
-        public final boolean isTemporary;
-        public final String marker;
-        Type(String marker, boolean isTemporary)
-        {
-            this.isTemporary = isTemporary;
-            this.marker = marker;
-        }
-    }
-
+    public static String TMP_EXT = ".tmp";
     public final File directory;
     /** version has the following format: <code>[a-z]+</code> */
     public final Version version;
     public final String ksname;
     public final String cfname;
     public final int generation;
-    public final Type type;
     public final SSTableFormat.Type formatType;
     private final int hashCode;
 
     /**
      * A descriptor that assumes CURRENT_VERSION.
      */
-    public Descriptor(File directory, String ksname, String cfname, int generation, Type temp)
+    public Descriptor(File directory, String ksname, String cfname, int generation)
     {
-        this(DatabaseDescriptor.getSSTableFormat().info.getLatestVersion(), directory, ksname, cfname, generation, temp, DatabaseDescriptor.getSSTableFormat());
+        this(DatabaseDescriptor.getSSTableFormat().info.getLatestVersion(), directory, ksname, cfname, generation, DatabaseDescriptor.getSSTableFormat());
     }
 
-    public Descriptor(File directory, String ksname, String cfname, int generation, Type temp, SSTableFormat.Type formatType)
+    public Descriptor(File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType)
     {
-        this(formatType.info.getLatestVersion(), directory, ksname, cfname, generation, temp, formatType);
+        this(formatType.info.getLatestVersion(), directory, ksname, cfname, generation, formatType);
     }
 
-    public Descriptor(String version, File directory, String ksname, String cfname, int generation, Type temp, SSTableFormat.Type formatType)
+    public Descriptor(String version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType)
     {
-        this(formatType.info.getVersion(version), directory, ksname, cfname, generation, temp, formatType);
+        this(formatType.info.getVersion(version), directory, ksname, cfname, generation, formatType);
     }
 
-    public Descriptor(Version version, File directory, String ksname, String cfname, int generation, Type temp, SSTableFormat.Type formatType)
+    public Descriptor(Version version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType)
     {
         assert version != null && directory != null && ksname != null && cfname != null && formatType.info.getLatestVersion().getClass().equals(version.getClass());
         this.version = version;
@@ -94,20 +80,24 @@ public class Descriptor
         this.ksname = ksname;
         this.cfname = cfname;
         this.generation = generation;
-        this.type = temp;
         this.formatType = formatType;
 
-        hashCode = Objects.hashCode(version, directory, generation, ksname, cfname, temp, formatType);
+        hashCode = Objects.hashCode(version, directory, generation, ksname, cfname, formatType);
     }
 
     public Descriptor withGeneration(int newGeneration)
     {
-        return new Descriptor(version, directory, ksname, cfname, newGeneration, type, formatType);
+        return new Descriptor(version, directory, ksname, cfname, newGeneration, formatType);
     }
 
     public Descriptor withFormatType(SSTableFormat.Type newType)
     {
-        return new Descriptor(newType.info.getLatestVersion(), directory, ksname, cfname, generation, type, newType);
+        return new Descriptor(newType.info.getLatestVersion(), directory, ksname, cfname, generation, newType);
+    }
+
+    public String tmpFilenameFor(Component component)
+    {
+        return filenameFor(component) + TMP_EXT;
     }
 
     public String filenameFor(Component component)
@@ -130,8 +120,6 @@ public class Descriptor
             buff.append(ksname).append(separator);
             buff.append(cfname).append(separator);
         }
-        if (type.isTemporary)
-            buff.append(type.marker).append(separator);
         buff.append(version).append(separator);
         buff.append(generation);
         if (formatType != SSTableFormat.Type.LEGACY)
@@ -160,6 +148,37 @@ public class Descriptor
         return baseFilename() + separator + suffix;
     }
 
+
+    /** Return any temporary files found in the directory */
+    public List<File> getTemporaryFiles()
+    {
+        List<File> ret = new ArrayList<>();
+        File[] tmpFiles = directory.listFiles((dir, name) ->
+                                              name.endsWith(Descriptor.TMP_EXT));
+
+        for (File tmpFile : tmpFiles)
+            ret.add(tmpFile);
+
+        return ret;
+    }
+
+    /**
+     *  Files obsoleted by CASSANDRA-7066 :
+     *  - temporary files used to start with either tmp or tmplink
+     *  - system.compactions_in_progress sstable files
+     */
+    public static boolean isLegacyFile(String fileName)
+    {
+        return fileName.startsWith("compactions_in_progress") ||
+               fileName.startsWith("tmp") ||
+               fileName.startsWith("tmplink");
+    }
+
+    public static boolean isValidFile(String fileName)
+    {
+        return fileName.endsWith(".db") && !isLegacyFile(fileName);
+    }
+
     /**
      * @see #fromFilename(File directory, String name)
      * @param filename The SSTable filename
@@ -222,7 +241,7 @@ public class Descriptor
         String component = skipComponent ? null : tokenStack.pop();
 
         nexttok = tokenStack.pop();
-        // generation OR Type
+        // generation OR format type
         SSTableFormat.Type fmt = SSTableFormat.Type.LEGACY;
         if (!CharMatcher.DIGIT.matchesAllOf(nexttok))
         {
@@ -240,20 +259,6 @@ public class Descriptor
         if (!version.validate(nexttok))
             throw new UnsupportedOperationException("SSTable " + name + " is too old to open.  Upgrade to 2.0 first, and run upgradesstables");
 
-        // optional temporary marker
-        Type type = Descriptor.Type.FINAL;
-        nexttok = tokenStack.peek();
-        if (Descriptor.Type.TEMP.marker.equals(nexttok))
-        {
-            type = Descriptor.Type.TEMP;
-            tokenStack.pop();
-        }
-        else if (Descriptor.Type.TEMPLINK.marker.equals(nexttok))
-        {
-            type = Descriptor.Type.TEMPLINK;
-            tokenStack.pop();
-        }
-
         // ks/cf names
         String ksname, cfname;
         if (version.hasNewFileName())
@@ -285,16 +290,7 @@ public class Descriptor
         }
         assert tokenStack.isEmpty() : "Invalid file name " + name + " in " + directory;
 
-        return Pair.create(new Descriptor(version, parentDirectory, ksname, cfname, generation, type, fmt), component);
-    }
-
-    /**
-     * @param type temporary flag
-     * @return A clone of this descriptor with the given 'temporary' status.
-     */
-    public Descriptor asType(Type type)
-    {
-        return new Descriptor(version, directory, ksname, cfname, generation, type, formatType);
+        return Pair.create(new Descriptor(version, parentDirectory, ksname, cfname, generation, fmt), component);
     }
 
     public IMetadataSerializer getMetadataSerializer()
@@ -331,8 +327,7 @@ public class Descriptor
                        && that.generation == this.generation
                        && that.ksname.equals(this.ksname)
                        && that.cfname.equals(this.cfname)
-                       && that.formatType == this.formatType
-                       && that.type == this.type;
+                       && that.formatType == this.formatType;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 6f66fd3..e6558eb 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -259,11 +259,12 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
         {
             for (SSTableReader sstable : ImmutableList.copyOf(txn.originals()))
             {
-                // We can't change the sampling level of sstables with the old format, because the serialization format
-                // doesn't include the sampling level.  Leave this one as it is.  (See CASSANDRA-8993 for details.)
-                logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
                 if (!sstable.descriptor.version.hasSamplingLevel())
                 {
+                    // We can't change the sampling level of sstables with the old format, because the serialization format
+                    // doesn't include the sampling level.  Leave this one as it is.  (See CASSANDRA-8993 for details.)
+                    logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
+
                     oldFormatSSTables.add(sstable);
                     txn.cancel(sstable);
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 2077152..516534d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -22,6 +22,7 @@ import java.nio.charset.Charset;
 import java.util.*;
 import java.util.concurrent.CopyOnWriteArraySet;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Sets;
@@ -38,7 +39,6 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.concurrent.RefCounted;
 import org.apache.cassandra.utils.memory.HeapAllocator;
 import org.apache.cassandra.utils.Pair;
 
@@ -71,7 +71,7 @@ public abstract class SSTable
 
     protected SSTable(Descriptor descriptor, CFMetaData metadata, IPartitioner partitioner)
     {
-        this(descriptor, new HashSet<Component>(), metadata, partitioner);
+        this(descriptor, new HashSet<>(), metadata, partitioner);
     }
 
     protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner)
@@ -113,7 +113,9 @@ public abstract class SSTable
 
             FileUtils.deleteWithConfirm(desc.filenameFor(component));
         }
-        FileUtils.delete(desc.filenameFor(Component.SUMMARY));
+
+        if (components.contains(Component.SUMMARY))
+            FileUtils.delete(desc.filenameFor(Component.SUMMARY));
 
         logger.debug("Deleted {}", desc);
         return true;
@@ -150,6 +152,15 @@ public abstract class SSTable
         return descriptor.ksname;
     }
 
+    @VisibleForTesting
+    public List<String> getAllFilePaths()
+    {
+        List<String> ret = new ArrayList<>();
+        for (Component component : components)
+            ret.add(descriptor.filenameFor(component));
+        return ret;
+    }
+
     /**
      * @return Descriptor and Component pair. null if given file is not acceptable as SSTable component.
      *         If component is of unknown type, returns CUSTOM component.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
deleted file mode 100644
index f0eb67f..0000000
--- a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.sstable;
-
-import java.io.File;
-import java.util.Collections;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.codahale.metrics.Counter;
-import org.apache.cassandra.concurrent.ScheduledExecutors;
-import org.apache.cassandra.db.lifecycle.Tracker;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.Blocker;
-
-public class SSTableDeletingTask implements Runnable
-{
-    private static final Logger logger = LoggerFactory.getLogger(SSTableDeletingTask.class);
-
-    // Deleting sstables is tricky because the mmapping might not have been finalized yet,
-    // and delete will fail (on Windows) until it is (we only force the unmapping on SUN VMs).
-    // Additionally, we need to make sure to delete the data file first, so on restart the others
-    // will be recognized as GCable.
-    private static final Queue<SSTableDeletingTask> failedTasks = new ConcurrentLinkedQueue<>();
-    private static final Blocker blocker = new Blocker();
-
-    private final Descriptor desc;
-    private final Set<Component> components;
-    private final long bytesOnDisk;
-    private final Counter totalDiskSpaceUsed;
-
-    /**
-     * realDescriptor is the actual descriptor for the sstable, the descriptor inside
-     * referent can be 'faked' as FINAL for early opened files. We need the real one
-     * to be able to remove the files.
-     */
-    public SSTableDeletingTask(Descriptor realDescriptor, Set<Component> components, Counter totalDiskSpaceUsed, long bytesOnDisk)
-    {
-        this.desc = realDescriptor;
-        this.bytesOnDisk = bytesOnDisk;
-        this.totalDiskSpaceUsed = totalDiskSpaceUsed;
-        switch (desc.type)
-        {
-            case FINAL:
-                this.components = components;
-                break;
-            case TEMPLINK:
-                this.components = Sets.newHashSet(Component.DATA, Component.PRIMARY_INDEX);
-                break;
-            default:
-                throw new IllegalStateException();
-        }
-    }
-
-    public void schedule()
-    {
-        ScheduledExecutors.nonPeriodicTasks.submit(this);
-    }
-
-    public void run()
-    {
-        blocker.ask();
-        // If we can't successfully delete the DATA component, set the task to be retried later: see above
-        File datafile = new File(desc.filenameFor(Component.DATA));
-        if (!datafile.delete())
-        {
-            logger.error("Unable to delete {} (it will be removed on server restart; we'll also retry after GC)", datafile);
-            failedTasks.add(this);
-            return;
-        }
-        // let the remainder be cleaned up by delete
-        SSTable.delete(desc, Sets.difference(components, Collections.singleton(Component.DATA)));
-        if (totalDiskSpaceUsed != null)
-            totalDiskSpaceUsed.dec(bytesOnDisk);
-    }
-
-    /**
-     * Retry all deletions that failed the first time around (presumably b/c the sstable was still mmap'd.)
-     * Useful because there are times when we know GC has been invoked; also exposed as an mbean.
-     */
-    public static void rescheduleFailedTasks()
-    {
-        SSTableDeletingTask task;
-        while ( null != (task = failedTasks.poll()))
-            task.schedule();
-    }
-
-    /** for tests */
-    @VisibleForTesting
-    public static void waitForDeletions()
-    {
-        Runnable runnable = new Runnable()
-        {
-            public void run()
-            {
-            }
-        };
-
-        FBUtilities.waitOnFuture(ScheduledExecutors.nonPeriodicTasks.schedule(runnable, 0, TimeUnit.MILLISECONDS));
-    }
-
-    @VisibleForTesting
-    public static void pauseDeletions(boolean stop)
-    {
-        blocker.block(stop);
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index b99003b..f25d3ff 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Multimap;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -78,13 +79,17 @@ public class SSTableLoader implements StreamEventHandler
 
         directory.list(new FilenameFilter()
         {
+            final Map<File, Set<File>> allTemporaryFiles = new HashMap<>();
             public boolean accept(File dir, String name)
             {
-                if (new File(dir, name).isDirectory())
+                File file = new File(dir, name);
+
+                if (file.isDirectory())
                     return false;
+
                 Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(dir, name);
                 Descriptor desc = p == null ? null : p.left;
-                if (p == null || !p.right.equals(Component.DATA) || desc.type.isTemporary)
+                if (p == null || !p.right.equals(Component.DATA))
                     return false;
 
                 if (!new File(desc.filenameFor(Component.PRIMARY_INDEX)).exists())
@@ -100,6 +105,19 @@ public class SSTableLoader implements StreamEventHandler
                     return false;
                 }
 
+                Set<File> temporaryFiles = allTemporaryFiles.get(dir);
+                if (temporaryFiles == null)
+                {
+                    temporaryFiles = LifecycleTransaction.getTemporaryFiles(metadata, dir);
+                    allTemporaryFiles.put(dir, temporaryFiles);
+                }
+
+                if (temporaryFiles.contains(file))
+                {
+                    outputHandler.output(String.format("Skipping temporary file %s", name));
+                    return false;
+                }
+
                 Set<Component> components = new HashSet<>();
                 components.add(Component.DATA);
                 components.add(Component.PRIMARY_INDEX);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 8580644..4eebb0c 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -17,11 +17,9 @@
  */
 package org.apache.cassandra.io.sstable;
 
-import java.io.File;
 import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.Runnables;
 
 import org.apache.cassandra.cache.InstrumentingCache;
 import org.apache.cassandra.cache.KeyCacheKey;
@@ -69,6 +67,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
 
     private final List<SSTableWriter> writers = new ArrayList<>();
     private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of Tracker)
+    private boolean keepOriginals; // true if we do not want to obsolete the originals
 
     private SSTableWriter writer;
     private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
@@ -95,9 +94,16 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
         this.cfs = cfs;
         this.maxAge = maxAge;
         this.isOffline = isOffline;
+        this.keepOriginals = false;
         this.preemptiveOpenInterval = preemptiveOpenInterval;
     }
 
+    public SSTableRewriter keepOriginals(boolean val)
+    {
+        keepOriginals = val;
+        return this;
+    }
+
     private static long calculateOpenInterval(boolean shouldOpenEarly)
     {
         long interval = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20);
@@ -189,6 +195,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
     {
         for (SSTableWriter writer : writers)
             accumulate = writer.commit(accumulate);
+
         accumulate = transaction.commit(accumulate);
         return accumulate;
     }
@@ -280,17 +287,19 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
             if (writer != null)
             {
                 writer.abort();
+
+                transaction.untrackNew(writer);
                 writers.remove(writer);
             }
             writer = newWriter;
+
             return;
         }
 
-        SSTableReader reader = null;
         if (preemptiveOpenInterval != Long.MAX_VALUE)
         {
             // we leave it as a tmp file, but we open it and add it to the Tracker
-            reader = writer.setMaxDataAge(maxAge).openFinalEarly();
+            SSTableReader reader = writer.setMaxDataAge(maxAge).openFinalEarly();
             transaction.update(reader, false);
             moveStarts(reader, reader.last);
             transaction.checkpoint();
@@ -357,8 +366,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
         if (throwLate)
             throw new RuntimeException("exception thrown after all sstables finished, for testing");
 
-        // TODO: do we always want to avoid obsoleting if offline?
-        if (!isOffline)
+        if (!keepOriginals)
             transaction.obsoleteOriginals();
 
         transaction.prepareToCommit();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index db6ed42..a70b92f 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -34,7 +34,6 @@ import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.UnfilteredSerializer;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 /**
@@ -131,11 +130,13 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
         try
         {
             diskWriter.join();
+            checkForWriterException();
         }
-        catch (InterruptedException e)
+        catch (Throwable e)
         {
             throw new RuntimeException(e);
         }
+
         checkForWriterException();
     }
 
@@ -203,7 +204,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
                     if (b == SENTINEL)
                         return;
 
-                    try (SSTableWriter writer = createWriter())
+                        try (SSTableTxnWriter writer = createWriter())
                     {
                         for (Map.Entry<DecoratedKey, PartitionUpdate> entry : b.entrySet())
                             writer.append(entry.getValue().unfilteredIterator());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
index 0b06405..b22a048 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
@@ -26,7 +26,6 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
 
 /**
  * A SSTable writer that assumes rows are in (partitioner) sorted order.
@@ -43,14 +42,14 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
     protected DecoratedKey currentKey;
     protected PartitionUpdate update;
 
-    private SSTableWriter writer;
+    private SSTableTxnWriter writer;
 
     protected SSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner, PartitionColumns columns)
     {
         super(directory, metadata, partitioner, columns);
     }
 
-    private SSTableWriter getOrCreateWriter()
+    private SSTableTxnWriter getOrCreateWriter()
     {
         if (writer == null)
             writer = createWriter();