You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/08/17 11:20:42 UTC

[6/7] cassandra git commit: Improve transaction log under FS corruption

Improve transaction log under FS corruption

To address concerns with recovery from file system
failures, we consolidate the new and old log
files into a single file, introduce checksums to the
file, and record on-disk sstable state to permit
validation of the log file on restart. This permits
us to be pessimistic in the case of any failure,
leaving all affected files on disk.

patch by stefania; reviewed by benedict for CASSANDRA-7066


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5726625a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5726625a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5726625a

Branch: refs/heads/trunk
Commit: 5726625a5c7cf47a67509540f7146d05b668bc20
Parents: 47b66d7
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Wed Jul 29 12:01:01 2015 +0800
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Mon Aug 17 10:20:01 2015 +0100

----------------------------------------------------------------------
 NEWS.txt                                        |   18 +
 bin/sstablelister                               |   55 -
 bin/sstablelister.bat                           |   41 -
 bin/sstableutil                                 |   55 +
 bin/sstableutil.bat                             |   41 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |   12 +-
 .../org/apache/cassandra/db/Directories.java    |  145 ++-
 .../db/compaction/CompactionManagerMBean.java   |    3 +-
 .../cassandra/db/compaction/Upgrader.java       |    9 -
 .../writers/MajorLeveledCompactionWriter.java   |    9 +-
 .../apache/cassandra/db/lifecycle/Helpers.java  |   12 +-
 .../db/lifecycle/LifecycleTransaction.java      |   69 +-
 .../apache/cassandra/db/lifecycle/Tracker.java  |    4 +-
 .../cassandra/db/lifecycle/TransactionLog.java  | 1137 ++++++++++++++++++
 .../cassandra/db/lifecycle/TransactionLogs.java |  786 ------------
 .../cassandra/io/sstable/SSTableLoader.java     |  162 ++-
 .../io/sstable/format/SSTableReader.java        |   22 +-
 .../cassandra/io/sstable/format/Version.java    |    2 +
 .../io/sstable/format/big/BigFormat.java        |   12 +
 .../io/sstable/metadata/CompactionMetadata.java |   43 +-
 .../metadata/LegacyMetadataSerializer.java      |   13 +-
 .../io/sstable/metadata/MetadataCollector.java  |   28 +-
 .../org/apache/cassandra/io/util/FileUtils.java |    5 -
 .../apache/cassandra/service/GCInspector.java   |    4 +-
 .../apache/cassandra/service/StartupChecks.java |    3 +-
 .../cassandra/service/StorageService.java       |    4 +-
 .../cassandra/tools/SSTableExpiredBlockers.java |    3 +-
 .../cassandra/tools/SSTableLevelResetter.java   |    3 +-
 .../cassandra/tools/SSTableMetadataViewer.java  |    2 -
 .../cassandra/tools/SSTableOfflineRelevel.java  |    2 +-
 .../cassandra/tools/StandaloneLister.java       |  214 ----
 .../cassandra/tools/StandaloneSSTableUtil.java  |  241 ++++
 .../cassandra/tools/StandaloneScrubber.java     |    6 +-
 .../cassandra/tools/StandaloneSplitter.java     |    4 +-
 .../cassandra/tools/StandaloneUpgrader.java     |    6 +-
 .../cassandra/tools/StandaloneVerifier.java     |    2 +-
 .../apache/cassandra/tools/nodetool/Stop.java   |    2 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     |    2 +-
 .../apache/cassandra/db/DirectoriesTest.java    |    7 +-
 .../org/apache/cassandra/db/KeyCacheTest.java   |    6 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |    4 +-
 .../cassandra/db/lifecycle/HelpersTest.java     |    9 +-
 .../db/lifecycle/LifecycleTransactionTest.java  |    2 +-
 .../db/lifecycle/RealTransactionsTest.java      |   15 +-
 .../cassandra/db/lifecycle/TrackerTest.java     |   90 +-
 .../db/lifecycle/TransactionLogTest.java        |  791 ++++++++++++
 .../db/lifecycle/TransactionLogsTest.java       |  581 ---------
 .../io/sstable/CQLSSTableWriterClientTest.java  |    7 -
 .../io/sstable/CQLSSTableWriterTest.java        |    1 -
 .../io/sstable/SSTableRewriterTest.java         |   36 +-
 .../metadata/MetadataSerializerTest.java        |    7 -
 .../org/apache/cassandra/schema/DefsTest.java   |   11 +-
 52 files changed, 2625 insertions(+), 2123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index ef764ad..365ed31 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -38,6 +38,24 @@ Upgrading
    - The default JVM flag -XX:+PerfDisableSharedMem will cause the following tools JVM
      to stop working: jps, jstack, jinfo, jmc, jcmd as well as 3rd party tools like Jolokia.
      If you wish to use these tools you can comment this flag out in cassandra-env.{sh,ps1}
+   - New transaction log files have been introduced to replace the compactions_in_progress
+     system table, temporary file markers (tmp and tmplink) and sstable ancerstors.
+     Therefore, compaction metadata no longer contains ancestors. Transaction log files
+     list sstable descriptors involved in compactions and other operations such as flushing
+     and streaming. Use the sstableutil tool to list any sstable files currently involved
+     in operations not yet completed, which previously would have been marked as temporary.
+     A transaction log file contains one sstable per line, with the prefix "add:" or "remove:".
+     They also contain a special line "commit", only inserted at the end when the transaction
+     is committed. On startup we use these files to cleanup any partial transactions that were
+     in progress when the process exited. If the commit line is found, we keep new sstables
+     (those with the "add" prefix) and delete the old sstables (those with the "remove" prefix),
+     vice-versa if the commit line is missing. Should you lose or delete these log files,
+     both old and new sstable files will be kept as live files, which will result in duplicated
+     sstables. These files are protected by incremental checksums so you should not manually
+     edit them. When restoring a full backup or moving sstable files, you should clean-up
+     any left over transactions and their temporary files first. You can use this command:
+      ===> sstableutil -c ks table
+     See CASSANDRA-7066 for full details.
    - New write stages have been added for batchlog and materialized view mutations
      you can set their size in cassandra.yaml
    - User defined functions are now executed in a sandbox.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/bin/sstablelister
----------------------------------------------------------------------
diff --git a/bin/sstablelister b/bin/sstablelister
deleted file mode 100755
index a79409d..0000000
--- a/bin/sstablelister
+++ /dev/null
@@ -1,55 +0,0 @@
-#!/bin/sh
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-if [ "x$CASSANDRA_INCLUDE" = "x" ]; then
-    for include in /usr/share/cassandra/cassandra.in.sh \
-                   /usr/local/share/cassandra/cassandra.in.sh \
-                   /opt/cassandra/cassandra.in.sh \
-                   ~/.cassandra.in.sh \
-                   "`dirname "$0"`/cassandra.in.sh"; do
-        if [ -r "$include" ]; then
-            . "$include"
-            break
-        fi
-    done
-elif [ -r "$CASSANDRA_INCLUDE" ]; then
-    . "$CASSANDRA_INCLUDE"
-fi
-
-# Use JAVA_HOME if set, otherwise look for java in PATH
-if [ -x "$JAVA_HOME/bin/java" ]; then
-    JAVA="$JAVA_HOME/bin/java"
-else
-    JAVA="`which java`"
-fi
-
-if [ -z "$CLASSPATH" ]; then
-    echo "You must set the CLASSPATH var" >&2
-    exit 1
-fi
-
-if [ "x$MAX_HEAP_SIZE" = "x" ]; then
-    MAX_HEAP_SIZE="256M"
-fi
-
-"$JAVA" $JAVA_AGENT -ea -cp "$CLASSPATH" $JVM_OPTS -Xmx$MAX_HEAP_SIZE \
-        -Dcassandra.storagedir="$cassandra_storagedir" \
-        -Dlogback.configurationFile=logback-tools.xml \
-        org.apache.cassandra.tools.StandaloneLister "$@"
-
-# vi:ai sw=4 ts=4 tw=0 et

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/bin/sstablelister.bat
----------------------------------------------------------------------
diff --git a/bin/sstablelister.bat b/bin/sstablelister.bat
deleted file mode 100644
index cb50a08..0000000
--- a/bin/sstablelister.bat
+++ /dev/null
@@ -1,41 +0,0 @@
-@REM
-@REM  Licensed to the Apache Software Foundation (ASF) under one or more
-@REM  contributor license agreements.  See the NOTICE file distributed with
-@REM  this work for additional information regarding copyright ownership.
-@REM  The ASF licenses this file to You under the Apache License, Version 2.0
-@REM  (the "License"); you may not use this file except in compliance with
-@REM  the License.  You may obtain a copy of the License at
-@REM
-@REM      http://www.apache.org/licenses/LICENSE-2.0
-@REM
-@REM  Unless required by applicable law or agreed to in writing, software
-@REM  distributed under the License is distributed on an "AS IS" BASIS,
-@REM  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-@REM  See the License for the specific language governing permissions and
-@REM  limitations under the License.
-
-@echo off
-if "%OS%" == "Windows_NT" setlocal
-
-pushd "%~dp0"
-call cassandra.in.bat
-
-if NOT DEFINED CASSANDRA_MAIN set CASSANDRA_MAIN=org.apache.cassandra.tools.StandaloneLister
-if NOT DEFINED JAVA_HOME goto :err
-
-REM ***** JAVA options *****
-set JAVA_OPTS=^
- -Dlogback.configurationFile=logback-tools.xml
-
-set TOOLS_PARAMS=
-
-"%JAVA_HOME%\bin\java" %JAVA_OPTS% %CASSANDRA_PARAMS% -cp %CASSANDRA_CLASSPATH% "%CASSANDRA_MAIN%" %*
-goto finally
-
-:err
-echo JAVA_HOME environment variable must be set!
-pause
-
-:finally
-
-ENDLOCAL

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/bin/sstableutil
----------------------------------------------------------------------
diff --git a/bin/sstableutil b/bin/sstableutil
new file mode 100755
index 0000000..2860729
--- /dev/null
+++ b/bin/sstableutil
@@ -0,0 +1,55 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ "x$CASSANDRA_INCLUDE" = "x" ]; then
+    for include in /usr/share/cassandra/cassandra.in.sh \
+                   /usr/local/share/cassandra/cassandra.in.sh \
+                   /opt/cassandra/cassandra.in.sh \
+                   ~/.cassandra.in.sh \
+                   "`dirname "$0"`/cassandra.in.sh"; do
+        if [ -r "$include" ]; then
+            . "$include"
+            break
+        fi
+    done
+elif [ -r "$CASSANDRA_INCLUDE" ]; then
+    . "$CASSANDRA_INCLUDE"
+fi
+
+# Use JAVA_HOME if set, otherwise look for java in PATH
+if [ -x "$JAVA_HOME/bin/java" ]; then
+    JAVA="$JAVA_HOME/bin/java"
+else
+    JAVA="`which java`"
+fi
+
+if [ -z "$CLASSPATH" ]; then
+    echo "You must set the CLASSPATH var" >&2
+    exit 1
+fi
+
+if [ "x$MAX_HEAP_SIZE" = "x" ]; then
+    MAX_HEAP_SIZE="256M"
+fi
+
+"$JAVA" $JAVA_AGENT -ea -cp "$CLASSPATH" $JVM_OPTS -Xmx$MAX_HEAP_SIZE \
+        -Dcassandra.storagedir="$cassandra_storagedir" \
+        -Dlogback.configurationFile=logback-tools.xml \
+        org.apache.cassandra.tools.StandaloneSSTableUtil "$@"
+
+# vi:ai sw=4 ts=4 tw=0 et

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/bin/sstableutil.bat
----------------------------------------------------------------------
diff --git a/bin/sstableutil.bat b/bin/sstableutil.bat
new file mode 100644
index 0000000..bc3eb8a
--- /dev/null
+++ b/bin/sstableutil.bat
@@ -0,0 +1,41 @@
+@REM
+@REM  Licensed to the Apache Software Foundation (ASF) under one or more
+@REM  contributor license agreements.  See the NOTICE file distributed with
+@REM  this work for additional information regarding copyright ownership.
+@REM  The ASF licenses this file to You under the Apache License, Version 2.0
+@REM  (the "License"); you may not use this file except in compliance with
+@REM  the License.  You may obtain a copy of the License at
+@REM
+@REM      http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM  Unless required by applicable law or agreed to in writing, software
+@REM  distributed under the License is distributed on an "AS IS" BASIS,
+@REM  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+@REM  See the License for the specific language governing permissions and
+@REM  limitations under the License.
+
+@echo off
+if "%OS%" == "Windows_NT" setlocal
+
+pushd "%~dp0"
+call cassandra.in.bat
+
+if NOT DEFINED CASSANDRA_MAIN set CASSANDRA_MAIN=org.apache.cassandra.tools.StandaloneSSTableUtil
+if NOT DEFINED JAVA_HOME goto :err
+
+REM ***** JAVA options *****
+set JAVA_OPTS=^
+ -Dlogback.configurationFile=logback-tools.xml
+
+set TOOLS_PARAMS=
+
+"%JAVA_HOME%\bin\java" %JAVA_OPTS% %CASSANDRA_PARAMS% -cp %CASSANDRA_CLASSPATH% "%CASSANDRA_MAIN%" %*
+goto finally
+
+:err
+echo JAVA_HOME environment variable must be set!
+pause
+
+:finally
+
+ENDLOCAL

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index c7d12a2..202047b 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -353,7 +353,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         if (data.loadsstables)
         {
-            Directories.SSTableLister sstableFiles = directories.sstableLister().skipTemporary(true);
+            Directories.SSTableLister sstableFiles = directories.sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true);
             Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata);
             data.addInitialSSTables(sstables);
         }
@@ -459,7 +459,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         SystemKeyspace.removeTruncationRecord(metadata.cfId);
 
         data.dropSSTables();
-        TransactionLogs.waitForDeletions();
+        TransactionLog.waitForDeletions();
 
         indexManager.invalidate();
         materializedViewManager.invalidate();
@@ -503,7 +503,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         // get the max generation number, to prevent generation conflicts
         Directories directories = new Directories(metadata);
-        Directories.SSTableLister lister = directories.sstableLister().includeBackups(true);
+        Directories.SSTableLister lister = directories.sstableLister(Directories.OnTxnErr.IGNORE).includeBackups(true);
         List<Integer> generations = new ArrayList<Integer>();
         for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
         {
@@ -534,7 +534,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         LifecycleTransaction.removeUnfinishedLeftovers(metadata);
 
         logger.debug("Further extra check for orphan sstable files for {}", metadata.cfName);
-        for (Map.Entry<Descriptor,Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
+        for (Map.Entry<Descriptor,Set<Component>> sstableFiles : directories.sstableLister(Directories.OnTxnErr.IGNORE).list().entrySet())
         {
             Descriptor desc = sstableFiles.getKey();
             Set<Component> components = sstableFiles.getValue();
@@ -642,7 +642,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             currentDescriptors.add(sstable.descriptor);
         Set<SSTableReader> newSSTables = new HashSet<>();
 
-        Directories.SSTableLister lister = directories.sstableLister().skipTemporary(true);
+        Directories.SSTableLister lister = directories.sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true);
         for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
         {
             Descriptor descriptor = entry.getKey();
@@ -1644,7 +1644,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         Map<Integer, SSTableReader> active = new HashMap<>();
         for (SSTableReader sstable : getSSTables(SSTableSet.CANONICAL))
             active.put(sstable.descriptor.generation, sstable);
-        Map<Descriptor, Set<Component>> snapshots = directories.sstableLister().snapshots(tag).list();
+        Map<Descriptor, Set<Component>> snapshots = directories.sstableLister(Directories.OnTxnErr.IGNORE).snapshots(tag).list();
         Refs<SSTableReader> refs = new Refs<>();
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index bede4c4..fa01269 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -31,6 +31,7 @@ import java.nio.file.attribute.BasicFileAttributes;
 import java.util.*;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
@@ -92,7 +93,6 @@ public class Directories
 
     public static final String BACKUPS_SUBDIR = "backups";
     public static final String SNAPSHOT_SUBDIR = "snapshots";
-    public static final String TRANSACTIONS_SUBDIR = "transactions";
     public static final String SECONDARY_INDEX_NAME_SEPARATOR = ".";
 
     public static final DataDirectory[] dataDirectories;
@@ -142,7 +142,7 @@ public class Directories
     {
         X, W, XW, R, XR, RW, XRW;
 
-        private FileAction()
+        FileAction()
         {
         }
 
@@ -468,40 +468,6 @@ public class Directories
         }
     }
 
-    public static File getTransactionsDirectory(File folder)
-    {
-        return getOrCreate(folder, TRANSACTIONS_SUBDIR);
-    }
-
-    public List<File> getExistingDirectories(String subFolder)
-    {
-        List<File> ret = new ArrayList<>();
-        for (File dir : dataPaths)
-        {
-            File subDir = getExistingDirectory(dir, subFolder);
-            if (subDir != null)
-                ret.add(subDir);
-
-        }
-        return ret;
-    }
-
-    public static File getExistingDirectory(File folder, String subFolder)
-    {
-        File subDir = new File(folder, join(subFolder));
-        if (subDir.exists())
-        {
-            assert(subDir.isDirectory());
-            return subDir;
-        }
-        return null;
-    }
-
-    public SSTableLister sstableLister()
-    {
-        return new SSTableLister();
-    }
-
     public static class DataDirectory
     {
         public final File location;
@@ -549,10 +515,42 @@ public class Directories
         }
     }
 
+    /** The type of files that can be listed by SSTableLister, we never return txn logs,
+     * use LifecycleTransaction.getFiles() if you need txn logs. */
+    public enum FileType
+    {
+        /** A permanent sstable file that is safe to use. */
+        FINAL,
+
+        /** A temporary sstable file that will soon be deleted. */
+        TEMPORARY,
+
+        /** A transaction log file (contains information on final and temporary files). */
+        TXN_LOG
+    }
+
+    /**
+     * How to handle a failure to read a txn log file. Note that we will try a few
+     * times before giving up.
+     **/
+    public enum OnTxnErr
+    {
+        /** Throw the exception */
+        THROW,
+
+        /** Ignore the txn log file */
+        IGNORE
+    }
+
+    public SSTableLister sstableLister(OnTxnErr onTxnErr)
+    {
+        return new SSTableLister(onTxnErr);
+    }
+
     public class SSTableLister
     {
+        private final OnTxnErr onTxnErr;
         private boolean skipTemporary;
-        private boolean onlyTemporary;
         private boolean includeBackups;
         private boolean onlyBackups;
         private int nbFiles;
@@ -560,19 +558,16 @@ public class Directories
         private boolean filtered;
         private String snapshotName;
 
-        public SSTableLister skipTemporary(boolean b)
+        private SSTableLister(OnTxnErr onTxnErr)
         {
-            if (filtered)
-                throw new IllegalStateException("list() has already been called");
-            skipTemporary = b;
-            return this;
+            this.onTxnErr = onTxnErr;
         }
 
-        public SSTableLister onlyTemporary(boolean b)
+        public SSTableLister skipTemporary(boolean b)
         {
             if (filtered)
                 throw new IllegalStateException("list() has already been called");
-            onlyTemporary = b;
+            skipTemporary = b;
             return this;
         }
 
@@ -633,56 +628,54 @@ public class Directories
 
                 if (snapshotName != null)
                 {
-                    getSnapshotDirectory(location, snapshotName).listFiles(getFilter(location));
+                    LifecycleTransaction.getFiles(getSnapshotDirectory(location, snapshotName).toPath(), getFilter(), onTxnErr);
                     continue;
                 }
 
                 if (!onlyBackups)
-                    location.listFiles(getFilter(location));
+                    LifecycleTransaction.getFiles(location.toPath(), getFilter(), onTxnErr);
 
                 if (includeBackups)
-                    getBackupsDirectory(location).listFiles(getFilter(location));
+                    LifecycleTransaction.getFiles(getBackupsDirectory(location).toPath(), getFilter(), onTxnErr);
             }
+
             filtered = true;
         }
 
-        private FileFilter getFilter(File location)
+        private BiFunction<File, FileType, Boolean> getFilter()
         {
-           final Set<File> temporaryFiles = skipTemporary || onlyTemporary
-                                            ? LifecycleTransaction.getTemporaryFiles(metadata, location)
-                                            : Collections.<File>emptySet();
-
-            return new FileFilter()
+            // This function always return false since it adds to the components map
+            return (file, type) ->
             {
-                // This function always return false since accepts adds to the components map
-                public boolean accept(File file)
+                switch (type)
                 {
-                    if (file.isDirectory())
-                        return false;
-
-                    Pair<Descriptor, Component> pair = SSTable.tryComponentFromFilename(file.getParentFile(), file.getName());
-                    if (pair == null)
+                    case TXN_LOG:
                         return false;
+                    case TEMPORARY:
+                        if (skipTemporary)
+                            return false;
 
-                    // we are only interested in the SSTable files that belong to the specific ColumnFamily
-                    if (!pair.left.ksname.equals(metadata.ksName) || !pair.left.cfname.equals(metadata.cfName))
-                        return false;
+                    case FINAL:
+                        Pair<Descriptor, Component> pair = SSTable.tryComponentFromFilename(file.getParentFile(), file.getName());
+                        if (pair == null)
+                            return false;
 
-                    if (skipTemporary && temporaryFiles.contains(file))
-                        return false;
+                        // we are only interested in the SSTable files that belong to the specific ColumnFamily
+                        if (!pair.left.ksname.equals(metadata.ksName) || !pair.left.cfname.equals(metadata.cfName))
+                            return false;
 
-                    if (onlyTemporary && !temporaryFiles.contains(file))
+                        Set<Component> previous = components.get(pair.left);
+                        if (previous == null)
+                        {
+                            previous = new HashSet<>();
+                            components.put(pair.left, previous);
+                        }
+                        previous.add(pair.right);
+                        nbFiles++;
                         return false;
 
-                    Set<Component> previous = components.get(pair.left);
-                    if (previous == null)
-                    {
-                        previous = new HashSet<>();
-                        components.put(pair.left, previous);
-                    }
-                    previous.add(pair.right);
-                    nbFiles++;
-                    return false;
+                    default:
+                        throw new AssertionError();
                 }
             };
         }
@@ -917,7 +910,7 @@ public class Directories
         {
             super();
             Builder<String> builder = ImmutableSet.builder();
-            for (File file : sstableLister().listFiles())
+            for (File file : sstableLister(Directories.OnTxnErr.THROW).listFiles())
                 builder.add(file.getName());
             alive = builder.build();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
index 8e200a1..d5da0fe 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
@@ -58,7 +58,8 @@ public interface CompactionManagerMBean
     /**
      * Stop an individual running compaction using the compactionId.
      * @param compactionId Compaction ID of compaction to stop. Such IDs can be found in
-     *                     the compactions_in_progress table of the system keyspace.
+     *                     the transaction log files whose name starts with compaction_,
+     *                     located in the table transactions folder.
      */
     public void stopCompactionById(String compactionId);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 5a36210..ebfd997 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -69,15 +69,6 @@ public class Upgrader
     private SSTableWriter createCompactionWriter(long repairedAt)
     {
         MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.getComparator());
-
-        // Get the max timestamp of the precompacted sstables
-        // and adds generation of live ancestors
-        sstableMetadataCollector.addAncestor(sstable.descriptor.generation);
-        for (Integer i : sstable.getAncestors())
-        {
-            if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
-                sstableMetadataCollector.addAncestor(i);
-        }
         sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
         return SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(directory)),
                                     estimatedRows,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index a826809..73ce216 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -45,7 +45,6 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
     private long partitionsWritten = 0;
     private long totalWrittenInLevel = 0;
     private int sstablesWritten = 0;
-    private final boolean skipAncestors;
 
     public MajorLeveledCompactionWriter(ColumnFamilyStore cfs,
                                         LifecycleTransaction txn,
@@ -70,17 +69,13 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
         long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(nonExpiredSSTables) / maxSSTableSize);
         long keysPerSSTable = estimatedTotalKeys / estimatedSSTables;
         File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
-        skipAncestors = estimatedSSTables * allSSTables.size() > 200000; // magic number, avoid storing too much ancestor information since allSSTables are ancestors to *all* resulting sstables
-
-        if (skipAncestors)
-            logger.warn("Many sstables involved in compaction, skipping storing ancestor information to avoid running out of memory");
 
         @SuppressWarnings("resource")
         SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
                                                     keysPerSSTable,
                                                     minRepairedAt,
                                                     cfs.metadata,
-                                                    new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors),
+                                                    new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel),
                                                     SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
                                                     txn);
         sstableWriter.switchWriter(writer);
@@ -108,7 +103,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
                                                         averageEstimatedKeysPerSSTable,
                                                         minRepairedAt,
                                                         cfs.metadata,
-                                                        new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors),
+                                                        new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel),
                                                         SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
                                                         txn);
             sstableWriter.switchWriter(writer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
index 8cb92b9..98983c5 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
@@ -106,12 +106,12 @@ class Helpers
             assert !reader.isReplaced();
     }
 
-    static Throwable markObsolete(List<TransactionLogs.Obsoletion> obsoletions, Throwable accumulate)
+    static Throwable markObsolete(List<TransactionLog.Obsoletion> obsoletions, Throwable accumulate)
     {
         if (obsoletions == null || obsoletions.isEmpty())
             return accumulate;
 
-        for (TransactionLogs.Obsoletion obsoletion : obsoletions)
+        for (TransactionLog.Obsoletion obsoletion : obsoletions)
         {
             try
             {
@@ -125,13 +125,13 @@ class Helpers
         return accumulate;
     }
 
-    static Throwable prepareForObsoletion(Iterable<SSTableReader> readers, TransactionLogs txnLogs, List<TransactionLogs.Obsoletion> obsoletions, Throwable accumulate)
+    static Throwable prepareForObsoletion(Iterable<SSTableReader> readers, TransactionLog txnLogs, List<TransactionLog.Obsoletion> obsoletions, Throwable accumulate)
     {
         for (SSTableReader reader : readers)
         {
             try
             {
-                obsoletions.add(new TransactionLogs.Obsoletion(reader, txnLogs.obsoleted(reader)));
+                obsoletions.add(new TransactionLog.Obsoletion(reader, txnLogs.obsoleted(reader)));
             }
             catch (Throwable t)
             {
@@ -141,12 +141,12 @@ class Helpers
         return accumulate;
     }
 
-    static Throwable abortObsoletion(List<TransactionLogs.Obsoletion> obsoletions, Throwable accumulate)
+    static Throwable abortObsoletion(List<TransactionLog.Obsoletion> obsoletions, Throwable accumulate)
     {
         if (obsoletions == null || obsoletions.isEmpty())
             return accumulate;
 
-        for (TransactionLogs.Obsoletion obsoletion : obsoletions)
+        for (TransactionLog.Obsoletion obsoletion : obsoletions)
         {
             try
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index 5bda4d4..c6cb979 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@ -18,10 +18,11 @@
 package org.apache.cassandra.db.lifecycle;
 
 import java.io.File;
+import java.nio.file.Path;
 import java.util.*;
+import java.util.function.BiFunction;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.collect.*;
 
@@ -29,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -96,7 +98,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
 
     public final Tracker tracker;
     // The transaction logs keep track of new and old sstable files
-    private final TransactionLogs transactionLogs;
+    private final TransactionLog transactionLog;
     // the original readers this transaction was opened over, and that it guards
     // (no other transactions may operate over these readers concurrently)
     private final Set<SSTableReader> originals = new HashSet<>();
@@ -113,7 +115,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
     private final State staged = new State();
 
     // the tidier and their readers, to be used for marking readers obsoleted during a commit
-    private List<TransactionLogs.Obsoletion> obsoletions;
+    private List<TransactionLog.Obsoletion> obsoletions;
 
     /**
      * construct a Transaction for use in an offline operation
@@ -141,7 +143,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
     public static LifecycleTransaction offline(OperationType operationType, CFMetaData metadata)
     {
         Tracker dummy = new Tracker(null, false);
-        return new LifecycleTransaction(dummy, new TransactionLogs(operationType, metadata, dummy), Collections.emptyList());
+        return new LifecycleTransaction(dummy, new TransactionLog(operationType, metadata, dummy), Collections.emptyList());
     }
 
     /**
@@ -150,18 +152,18 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
     public static LifecycleTransaction offline(OperationType operationType, File operationFolder)
     {
         Tracker dummy = new Tracker(null, false);
-        return new LifecycleTransaction(dummy, new TransactionLogs(operationType, operationFolder, dummy), Collections.emptyList());
+        return new LifecycleTransaction(dummy, new TransactionLog(operationType, operationFolder, dummy), Collections.emptyList());
     }
 
     LifecycleTransaction(Tracker tracker, OperationType operationType, Iterable<SSTableReader> readers)
     {
-        this(tracker, new TransactionLogs(operationType, getMetadata(tracker, readers), tracker), readers);
+        this(tracker, new TransactionLog(operationType, getMetadata(tracker, readers), tracker), readers);
     }
 
-    LifecycleTransaction(Tracker tracker, TransactionLogs transactionLogs, Iterable<SSTableReader> readers)
+    LifecycleTransaction(Tracker tracker, TransactionLog transactionLog, Iterable<SSTableReader> readers)
     {
         this.tracker = tracker;
-        this.transactionLogs = transactionLogs;
+        this.transactionLog = transactionLog;
         for (SSTableReader reader : readers)
         {
             originals.add(reader);
@@ -185,19 +187,19 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
         return null;
     }
 
-    public TransactionLogs logs()
+    public TransactionLog log()
     {
-        return transactionLogs;
+        return transactionLog;
     }
 
     public OperationType opType()
     {
-        return transactionLogs.getType();
+        return transactionLog.getType();
     }
 
     public UUID opId()
     {
-        return transactionLogs.getId();
+        return transactionLog.getId();
     }
 
     public void doPrepare()
@@ -210,8 +212,8 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
 
         // prepare for compaction obsolete readers as long as they were part of the original set
         // since those that are not original are early readers that share the same desc with the finals
-        maybeFail(prepareForObsoletion(filterIn(logged.obsolete, originals), transactionLogs, obsoletions = new ArrayList<>(), null));
-        transactionLogs.prepareToCommit();
+        maybeFail(prepareForObsoletion(filterIn(logged.obsolete, originals), transactionLog, obsoletions = new ArrayList<>(), null));
+        transactionLog.prepareToCommit();
     }
 
     /**
@@ -226,7 +228,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
         maybeFail(accumulate);
 
         // transaction log commit failure means we must abort; safe commit is not possible
-        maybeFail(transactionLogs.commit(null));
+        maybeFail(transactionLog.commit(null));
 
         // this is now the point of no return; we cannot safely rollback, so we ignore exceptions until we're done
         // we restore state by obsoleting our obsolete files, releasing our references to them, and updating our size
@@ -235,7 +237,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
         accumulate = markObsolete(obsoletions, accumulate);
         accumulate = tracker.updateSizeTracking(logged.obsolete, logged.update, accumulate);
         accumulate = release(selfRefs(logged.obsolete), accumulate);
-        accumulate = tracker.notifySSTablesChanged(originals, logged.update, transactionLogs.getType(), accumulate);
+        accumulate = tracker.notifySSTablesChanged(originals, logged.update, transactionLog.getType(), accumulate);
 
         return accumulate;
     }
@@ -251,16 +253,16 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
         accumulate = abortObsoletion(obsoletions, accumulate);
 
         if (logged.isEmpty() && staged.isEmpty())
-            return transactionLogs.abort(accumulate);
+            return transactionLog.abort(accumulate);
 
         // mark obsolete all readers that are not versions of those present in the original set
         Iterable<SSTableReader> obsolete = filterOut(concatUniq(staged.update, logged.update), originals);
         logger.debug("Obsoleting {}", obsolete);
 
-        accumulate = prepareForObsoletion(obsolete, transactionLogs, obsoletions = new ArrayList<>(), accumulate);
+        accumulate = prepareForObsoletion(obsolete, transactionLog, obsoletions = new ArrayList<>(), accumulate);
         // it's safe to abort even if committed, see maybeFail in doCommit() above, in this case it will just report
         // a failure to abort, which is useful information to have for debug
-        accumulate = transactionLogs.abort(accumulate);
+        accumulate = transactionLog.abort(accumulate);
         accumulate = markObsolete(obsoletions, accumulate);
 
         // replace all updated readers with a version restored to its original state
@@ -491,7 +493,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
             originals.remove(reader);
             marked.remove(reader);
         }
-        return new LifecycleTransaction(tracker, transactionLogs.getType(), readers);
+        return new LifecycleTransaction(tracker, transactionLog.getType(), readers);
     }
 
     /**
@@ -524,27 +526,34 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
 
     public void trackNew(SSTable table)
     {
-        transactionLogs.trackNew(table);
+        transactionLog.trackNew(table);
     }
 
     public void untrackNew(SSTable table)
     {
-        transactionLogs.untrackNew(table);
+        transactionLog.untrackNew(table);
     }
 
     public static void removeUnfinishedLeftovers(CFMetaData metadata)
     {
-        TransactionLogs.removeUnfinishedLeftovers(metadata);
+        TransactionLog.removeUnfinishedLeftovers(metadata);
     }
 
-    public static Set<File> getTemporaryFiles(CFMetaData metadata, File folder)
-    {
-        return TransactionLogs.getTemporaryFiles(metadata, folder);
-    }
-
-    public static Set<File> getLogFiles(CFMetaData metadata)
+    /**
+     * Get the files in the folder specified, provided that the filter returns true.
+     * A filter is given each file and its type, and decides which files should be returned
+     * and which should be discarded. To classify files into their type, we read transaction
+     * log files. Should we fail to read these log files after a few times, we look at onTxnErr
+     * to determine what to do.
+     *
+     * @param folder - the folder to scan
+     * @param onTxnErr - how to handle a failure to read a txn log file
+     * @param filter - A function that receives each file and its type, it should return true to have the file returned
+     * @return - the list of files that were scanned and for which the filter returned true
+     */
+    public static List<File> getFiles(Path folder, BiFunction<File, Directories.FileType, Boolean> filter, Directories.OnTxnErr onTxnErr)
     {
-        return TransactionLogs.getLogFiles(metadata);
+        return new TransactionLog.FileLister(folder, filter, onTxnErr).list();
     }
 
     // a class representing the current state of the reader within this transaction, encoding the actions both logged

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index ea76fd6..6f6aca9 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -227,7 +227,7 @@ public class Tracker
      */
     public Throwable dropSSTables(final Predicate<SSTableReader> remove, OperationType operationType, Throwable accumulate)
     {
-        try (TransactionLogs txnLogs = new TransactionLogs(operationType, cfstore.metadata, this))
+        try (TransactionLog txnLogs = new TransactionLog(operationType, cfstore.metadata, this))
         {
             Pair<View, View> result = apply(view -> {
                 Set<SSTableReader> toremove = copyOf(filter(view.sstables, and(remove, notIn(view.compacting))));
@@ -239,7 +239,7 @@ public class Tracker
 
             // It is important that any method accepting/returning a Throwable never throws an exception, and does its best
             // to complete the instructions given to it
-            List<TransactionLogs.Obsoletion> obsoletions = new ArrayList<>();
+            List<TransactionLog.Obsoletion> obsoletions = new ArrayList<>();
             accumulate = prepareForObsoletion(removed, txnLogs, obsoletions, accumulate);
             try
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5726625a/src/java/org/apache/cassandra/db/lifecycle/TransactionLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/TransactionLog.java b/src/java/org/apache/cassandra/db/lifecycle/TransactionLog.java
new file mode 100644
index 0000000..a9e460c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/lifecycle/TransactionLog.java
@@ -0,0 +1,1137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.lifecycle;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.nio.file.SecureDirectoryStream;
+import java.util.*;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Runnables;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.big.BigFormat;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.concurrent.RefCounted;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+import static org.apache.cassandra.db.Directories.OnTxnErr;
+import static org.apache.cassandra.db.Directories.FileType;
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
+
+/**
+ * IMPORTANT: When this object is involved in a transactional graph, and is not encapsulated in a LifecycleTransaction,
+ * for correct behaviour its commit MUST occur before any others, since it may legitimately fail. This is consistent
+ * with the Transactional API, which permits one failing action to occur at the beginning of the commit phase, but also
+ * *requires* that the prepareToCommit() phase only take actions that can be rolled back.
+ *
+ * A class that tracks sstable files involved in a transaction across sstables:
+ * if the transaction succeeds the old files should be deleted and the new ones kept; vice-versa if it fails.
+ *
+ * The transaction log file contains new and old sstables as follows:
+ *
+ * add:[sstable-2][CRC]
+ * remove:[sstable-1,max_update_time,num files][CRC]
+ *
+ * where sstable-2 is a new sstable to be retained if the transaction succeeds and sstable-1 is an old sstable to be
+ * removed. CRC is an incremental CRC of the file content up to this point. For old sstable files we also log the
+ * last update time of all files for the sstable descriptor and a checksum of vital properties such as update times
+ * and file sizes.
+ *
+ * Upon commit we add a final line to the log file:
+ *
+ * commit:[commit_time][CRC]
+ *
+ * When the transaction log is cleaned-up by the TransactionTidier, which happens only after any old sstables have been
+ * osoleted, then any sstable files for old sstables are removed before deleting the transaction log if the transaction
+ * was committed, vice-versa if the transaction was aborted.
+ *
+ * On start-up we look for any transaction log files and repeat the cleanup process described above.
+ *
+ * See CASSANDRA-7066 for full details.
+ */
+public class TransactionLog extends Transactional.AbstractTransactional implements Transactional
+{
+    private static final Logger logger = LoggerFactory.getLogger(TransactionLog.class);
+    private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1L, TimeUnit.HOURS);
+
+    /**
+     * If the format of the lines in the transaction log is wrong or the checksum
+     * does not match, then we throw this exception.
+     */
+    public static final class CorruptTransactionLogException extends RuntimeException
+    {
+        public final TransactionFile file;
+
+        public CorruptTransactionLogException(String message, TransactionFile file)
+        {
+            super(message);
+            this.file = file;
+        }
+    }
+
+    public enum RecordType
+    {
+        ADD,    // new files to be retained on commit
+        REMOVE, // old files to be retained on abort
+        COMMIT, // commit flag
+        ABORT;  // abort flag
+        public static RecordType fromPrefix(String prefix)
+        {
+            return valueOf(prefix.toUpperCase());
+        }
+    }
+
+    /**
+     * A log file record, each record is encoded in one line and has different
+     * content depending on the record type.
+     */
+    final static class Record
+    {
+        public final RecordType type;
+        public final String relativeFilePath;
+        public final long updateTime;
+        public final int numFiles;
+        public final String record;
+
+        static String REGEX_STR = "^(add|remove|commit|abort):\\[([^,]*),?([^,]*),?([^,]*)\\]$";
+        static Pattern REGEX = Pattern.compile(REGEX_STR, Pattern.CASE_INSENSITIVE); // (add|remove|commit|abort):[*,*,*]
+
+        public static Record make(String record, boolean isLast)
+        {
+            try
+            {
+                Matcher matcher = REGEX.matcher(record);
+                if (!matcher.matches() || matcher.groupCount() != 4)
+                    throw new IllegalStateException(String.format("Invalid record \"%s\"", record));
+
+                RecordType type = RecordType.fromPrefix(matcher.group(1));
+                return new Record(type, matcher.group(2), Long.valueOf(matcher.group(3)), Integer.valueOf(matcher.group(4)), record);
+            }
+            catch (Throwable t)
+            {
+                if (!isLast)
+                    throw t;
+
+                int pos = record.indexOf(':');
+                if (pos <= 0)
+                    throw t;
+
+                RecordType recordType;
+                try
+                {
+                    recordType = RecordType.fromPrefix(record.substring(0, pos));
+                }
+                catch (Throwable ignore)
+                {
+                    throw t;
+                }
+
+                return new Record(recordType, "", 0, 0, record);
+
+            }
+        }
+
+        public static Record makeCommit(long updateTime)
+        {
+            return new Record(RecordType.COMMIT, "", updateTime, 0, "");
+        }
+
+        public static Record makeAbort(long updateTime)
+        {
+            return new Record(RecordType.ABORT, "", updateTime, 0, "");
+        }
+
+        public static Record makeNew(String relativeFilePath)
+        {
+            return new Record(RecordType.ADD, relativeFilePath, 0, 0, "");
+        }
+
+        public static Record makeOld(String parentFolder, String relativeFilePath)
+        {
+            return makeOld(getTrackedFiles(parentFolder, relativeFilePath), relativeFilePath);
+        }
+
+        public static Record makeOld(List<File> files, String relativeFilePath)
+        {
+            long lastModified = files.stream()
+                                     .mapToLong(File::lastModified)
+                                     .reduce(0L, Long::max);
+            return new Record(RecordType.REMOVE, relativeFilePath, lastModified, files.size(), "");
+        }
+
+        private Record(RecordType type,
+                       String relativeFilePath,
+                       long updateTime,
+                       int numFiles,
+                       String record)
+        {
+            this.type = type;
+            this.relativeFilePath = hasFilePath(type) ? relativeFilePath : ""; // only meaningful for some records
+            this.updateTime = type == RecordType.REMOVE ? updateTime : 0; // only meaningful for old records
+            this.numFiles = type == RecordType.REMOVE ? numFiles : 0; // only meaningful for old records
+            this.record = record.isEmpty() ? format() : record;
+        }
+
+        private static boolean hasFilePath(RecordType type)
+        {
+            return type == RecordType.ADD || type == RecordType.REMOVE;
+        }
+
+        private String format()
+        {
+            return String.format("%s:[%s,%d,%d]", type.toString(), relativeFilePath, updateTime, numFiles);
+        }
+
+        public byte[] getBytes()
+        {
+            return record.getBytes();
+        }
+
+        public boolean verify(String parentFolder, boolean lastRecordIsCorrupt)
+        {
+            if (type != RecordType.REMOVE)
+                return true;
+
+            List<File> files = getTrackedFiles(parentFolder);
+
+            // Paranoid sanity checks: we create another record by looking at the files as they are
+            // on disk right now and make sure the information still matches
+            Record currentRecord = Record.makeOld(files, relativeFilePath);
+            if (updateTime != currentRecord.updateTime)
+            {
+                logger.error("Possible disk corruption detected for sstable [{}], record [{}]: last update time [{}] should have been [{}]",
+                             relativeFilePath,
+                             record,
+                             new Date(currentRecord.updateTime),
+                             new Date(updateTime));
+                return false;
+            }
+
+            if (lastRecordIsCorrupt && currentRecord.numFiles < numFiles)
+            { // if we found a corruption in the last record, then we continue only if the number of files matches exactly.
+                logger.error("Possible disk corruption detected for sstable [{}], record [{}]: number of files [{}] should have been [{}]",
+                             relativeFilePath,
+                             record,
+                             currentRecord.numFiles,
+                             numFiles);
+                return false;
+            }
+
+            return true;
+        }
+
+        public List<File> getTrackedFiles(String parentFolder)
+        {
+            if (!hasFilePath(type))
+                return Collections.emptyList();
+
+            return getTrackedFiles(parentFolder, relativeFilePath);
+        }
+
+        public static List<File> getTrackedFiles(String parentFolder, String relativeFilePath)
+        {
+            return Arrays.asList(new File(parentFolder).listFiles((dir, name) -> name.startsWith(relativeFilePath)));
+        }
+
+        @Override
+        public int hashCode()
+        {
+            // see comment in equals
+            return Objects.hash(type, relativeFilePath);
+        }
+
+        @Override
+        public boolean equals(Object obj)
+        {
+            if (obj == null)
+                return false;
+
+            if (getClass() != obj.getClass())
+                return false;
+
+            final Record other = (Record)obj;
+
+            // we exclude on purpose checksum, update time and count as
+            // we don't want duplicated records that differ only by
+            // properties that might change on disk, especially COMMIT records,
+            // there should be only one regardless of update time
+            return type.equals(other.type) &&
+                   relativeFilePath.equals(other.relativeFilePath);
+        }
+
+        @Override
+        public String toString()
+        {
+            return record;
+        }
+    }
+
+    /**
+     * The transaction log file, which contains many records.
+     */
+    final static class TransactionFile
+    {
+        static String EXT = ".log";
+        static char SEP = '_';
+        // cc_txn_opname_id.log (where cc is one of the sstable versions defined in BigVersion)
+        static String FILE_REGEX_STR = String.format("^(.{2})_txn_(.*)_(.*)%s$", EXT);
+        static Pattern FILE_REGEX = Pattern.compile(FILE_REGEX_STR);
+        static String LINE_REGEX_STR = "^(.*)\\[(\\d*)\\]$"; // *[checksum]
+        static Pattern LINE_REGEX = Pattern.compile(LINE_REGEX_STR);
+
+        public final File file;
+        public final TransactionData parent;
+        public final Set<Record> records = new HashSet<>();
+        public final Checksum checksum = new CRC32();
+
+        public TransactionFile(TransactionData parent)
+        {
+            this.file = new File(parent.getFileName());
+            this.parent = parent;
+        }
+
+        public void readRecords()
+        {
+            records.clear();
+            checksum.reset();
+
+            Iterator<String> it = FileUtils.readLines(file).iterator();
+            while(it.hasNext())
+                records.add(readRecord(it.next(), !it.hasNext())); // JLS execution order is left-to-right
+
+            for (Record record : records)
+            {
+                if (!record.verify(parent.getFolder(), false))
+                    throw new CorruptTransactionLogException(String.format("Failed to verify transaction %s record [%s]: possible disk corruption, aborting", parent.getId(), record),
+                                                             this);
+            }
+        }
+
+        private Record readRecord(String line, boolean isLast)
+        {
+            Matcher matcher = LINE_REGEX.matcher(line);
+            if (!matcher.matches() || matcher.groupCount() != 2)
+            {
+                handleReadRecordError(String.format("cannot parse line \"%s\"", line), isLast);
+                return Record.make(line, isLast);
+            }
+
+            byte[] bytes = matcher.group(1).getBytes();
+            checksum.update(bytes, 0, bytes.length);
+
+            if (checksum.getValue() != Long.valueOf(matcher.group(2)))
+                handleReadRecordError(String.format("invalid line checksum %s for \"%s\"", matcher.group(2), line), isLast);
+
+            try
+            {
+                return Record.make(matcher.group(1), isLast);
+            }
+            catch (Throwable t)
+            {
+                throw new CorruptTransactionLogException(String.format("Cannot make record \"%s\": %s", line, t.getMessage()), this);
+            }
+        }
+
+        private void handleReadRecordError(String message, boolean isLast)
+        {
+            if (isLast)
+            {
+                for (Record record : records)
+                {
+                    if (!record.verify(parent.getFolder(), true))
+                        throw new CorruptTransactionLogException(String.format("Last record of transaction %s is corrupt [%s] and at least " +
+                                                                               "one previous record does not match state on disk, possible disk corruption, aborting",
+                                                                               parent.getId(), message),
+                                                                 this);
+                }
+
+                // if only the last record is corrupt and all other records have matching files on disk, @see verifyRecord,
+                // then we simply exited whilst serializing the last record and we carry on
+                logger.warn(String.format("Last record of transaction %s is corrupt or incomplete [%s], but all previous records match state on disk; continuing", parent.getId(), message));
+
+            }
+            else
+            {
+                throw new CorruptTransactionLogException(String.format("Non-last record of transaction %s is corrupt [%s], possible disk corruption, aborting", parent.getId(), message), this);
+            }
+        }
+
+        public void commit()
+        {
+            assert !completed() : "Already completed!";
+            addRecord(Record.makeCommit(System.currentTimeMillis()));
+        }
+
+        public void abort()
+        {
+            assert !completed() : "Already completed!";
+            addRecord(Record.makeAbort(System.currentTimeMillis()));
+        }
+
+        public boolean committed()
+        {
+            return records.contains(Record.makeCommit(0));
+        }
+
+        public boolean aborted()
+        {
+            return records.contains(Record.makeAbort(0));
+        }
+
+        public boolean completed()
+        {
+            return committed() || aborted();
+        }
+
+        public boolean add(RecordType type, SSTable table)
+        {
+            Record record = makeRecord(type, table);
+            if (records.contains(record))
+                return false;
+
+            addRecord(record);
+            return true;
+        }
+
+        private Record makeRecord(RecordType type, SSTable table)
+        {
+            String relativePath = FileUtils.getRelativePath(parent.getFolder(), table.descriptor.baseFilename());
+            if (type == RecordType.ADD)
+            {
+                return Record.makeNew(relativePath);
+            }
+            else if (type == RecordType.REMOVE)
+            {
+                return Record.makeOld(parent.getFolder(), relativePath);
+            }
+            else
+            {
+                throw new AssertionError("Invalid record type " + type);
+            }
+        }
+
+        private void addRecord(Record record)
+        {
+            // we only checksum the records, not the checksums themselves
+            byte[] bytes = record.getBytes();
+            checksum.update(bytes, 0, bytes.length);
+
+            records.add(record);
+            FileUtils.append(file, String.format("%s[%d]", record, checksum.getValue()));
+
+            parent.sync();
+        }
+
+        public void remove(RecordType type, SSTable table)
+        {
+            Record record = makeRecord(type, table);
+
+            assert records.contains(record) : String.format("[%s] is not tracked by %s", record, file);
+
+            records.remove(record);
+            deleteRecord(record);
+        }
+
+        public boolean contains(RecordType type, SSTable table)
+        {
+            return records.contains(makeRecord(type, table));
+        }
+
+        public void deleteRecords(RecordType type)
+        {
+            assert file.exists() : String.format("Expected %s to exists", file);
+            records.stream()
+                   .filter((r) -> r.type == type)
+                   .forEach(this::deleteRecord);
+            records.clear();
+        }
+
+        private void deleteRecord(Record record)
+        {
+            List<File> files = record.getTrackedFiles(parent.getFolder());
+            if (files.isEmpty())
+                return; // Files no longer exist, nothing to do
+
+            // we sort the files in ascending update time order so that the last update time
+            // stays the same even if we only partially delete files
+            files.sort((f1, f2) -> Long.compare(f1.lastModified(), f2.lastModified()));
+
+            files.forEach(TransactionLog::delete);
+        }
+
+        public Set<File> getTrackedFiles(RecordType type)
+        {
+            return records.stream()
+                          .filter((r) -> r.type == type)
+                          .map((r) -> r.getTrackedFiles(parent.getFolder()))
+                          .flatMap(List::stream)
+                          .collect(Collectors.toSet());
+        }
+
+        public void delete()
+        {
+            TransactionLog.delete(file);
+        }
+
+        public boolean exists()
+        {
+            return file.exists();
+        }
+
+        @Override
+        public String toString()
+        {
+            return FileUtils.getRelativePath(parent.getFolder(), FileUtils.getCanonicalPath(file));
+        }
+    }
+
+    /**
+     * We split the transaction data from TransactionLog that implements the behavior
+     * because we need to reconstruct any left-overs and clean them up, as well as work
+     * out which files are temporary. So for these cases we don't want the full
+     * transactional behavior, plus it's handy for the TransactionTidier.
+     */
+    final static class TransactionData implements AutoCloseable
+    {
+        private final OperationType opType;
+        private final UUID id;
+        private final File folder;
+        private final TransactionFile file;
+        private int folderDescriptor;
+
+        static TransactionData make(File logFile)
+        {
+            Matcher matcher = TransactionFile.FILE_REGEX.matcher(logFile.getName());
+            assert matcher.matches() && matcher.groupCount() == 3;
+
+            // For now we don't need this but it is there in case we need to change
+            // file format later on, the version is the sstable version as defined in BigFormat
+            //String version = matcher.group(1);
+
+            OperationType operationType = OperationType.fromFileName(matcher.group(2));
+            UUID id = UUID.fromString(matcher.group(3));
+
+            return new TransactionData(operationType, logFile.getParentFile(), id);
+        }
+
+        TransactionData(OperationType opType, File folder, UUID id)
+        {
+            this.opType = opType;
+            this.id = id;
+            this.folder = folder;
+            this.file = new TransactionFile(this);
+            this.folderDescriptor = CLibrary.tryOpenDirectory(folder.getPath());
+        }
+
+        public Throwable readLogFile(Throwable accumulate)
+        {
+            try
+            {
+                file.readRecords();
+            }
+            catch (Throwable t)
+            {
+                accumulate = merge(accumulate, t);
+            }
+
+            return accumulate;
+        }
+
+        public void close()
+        {
+            if (folderDescriptor > 0)
+            {
+                CLibrary.tryCloseFD(folderDescriptor);
+                folderDescriptor = -1;
+            }
+        }
+
+        void sync()
+        {
+            if (folderDescriptor > 0)
+                CLibrary.trySync(folderDescriptor);
+        }
+
+        OperationType getType()
+        {
+            return opType;
+        }
+
+        UUID getId()
+        {
+            return id;
+        }
+
+        boolean completed()
+        {
+            return  file.completed();
+        }
+
+        Throwable removeUnfinishedLeftovers(Throwable accumulate)
+        {
+            try
+            {
+                if (file.committed())
+                    file.deleteRecords(RecordType.REMOVE);
+                else
+                    file.deleteRecords(RecordType.ADD);
+
+                // we sync the parent file descriptor between contents and log deletion
+                // to ensure there is a happens before edge between them
+                sync();
+
+                file.delete();
+            }
+            catch (Throwable t)
+            {
+                accumulate = merge(accumulate, t);
+            }
+
+            return accumulate;
+        }
+
+        Set<File> getTemporaryFiles()
+        {
+            sync();
+
+            if (!file.exists())
+                return Collections.emptySet();
+
+            if (file.committed())
+                return file.getTrackedFiles(RecordType.REMOVE);
+            else
+                return file.getTrackedFiles(RecordType.ADD);
+        }
+
+        String getFileName()
+        {
+            String fileName = StringUtils.join(BigFormat.latestVersion,
+                                               TransactionFile.SEP,
+                                               "txn",
+                                               TransactionFile.SEP,
+                                               opType.fileName,
+                                               TransactionFile.SEP,
+                                               id.toString(),
+                                               TransactionFile.EXT);
+            return StringUtils.join(folder, File.separator, fileName);
+        }
+
+        String getFolder()
+        {
+            return folder.getPath();
+        }
+
+        static boolean isLogFile(String name)
+        {
+            return TransactionFile.FILE_REGEX.matcher(name).matches();
+        }
+
+        @VisibleForTesting
+        TransactionFile getLogFile()
+        {
+            return file;
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("[%s]", file.toString());
+        }
+    }
+
+    private final Tracker tracker;
+    private final TransactionData data;
+    private final Ref<TransactionLog> selfRef;
+    // Deleting sstables is tricky because the mmapping might not have been finalized yet,
+    // and delete will fail (on Windows) until it is (we only force the unmapping on SUN VMs).
+    // Additionally, we need to make sure to delete the data file first, so on restart the others
+    // will be recognized as GCable.
+    private static final Queue<Runnable> failedDeletions = new ConcurrentLinkedQueue<>();
+
+    TransactionLog(OperationType opType, CFMetaData metadata)
+    {
+        this(opType, metadata, null);
+    }
+
+    TransactionLog(OperationType opType, CFMetaData metadata, Tracker tracker)
+    {
+        this(opType, new Directories(metadata), tracker);
+    }
+
+    TransactionLog(OperationType opType, Directories directories, Tracker tracker)
+    {
+        this(opType, directories.getDirectoryForNewSSTables(), tracker);
+    }
+
+    TransactionLog(OperationType opType, File folder, Tracker tracker)
+    {
+        this.tracker = tracker;
+        this.data = new TransactionData(opType,
+                                        folder,
+                                        UUIDGen.getTimeUUID());
+        this.selfRef = new Ref<>(this, new TransactionTidier(data));
+
+        if (logger.isDebugEnabled())
+            logger.debug("Created transaction logs with id {}", data.id);
+    }
+
+    /**
+     * Track a reader as new.
+     **/
+    void trackNew(SSTable table)
+    {
+        if (!data.file.add(RecordType.ADD, table))
+            throw new IllegalStateException(table + " is already tracked as new");
+    }
+
+    /**
+     * Stop tracking a reader as new.
+     */
+    void untrackNew(SSTable table)
+    {
+        data.file.remove(RecordType.ADD, table);
+    }
+
+    /**
+     * Schedule a reader for deletion as soon as it is fully unreferenced and the transaction
+     * has been committed.
+     */
+    SSTableTidier obsoleted(SSTableReader reader)
+    {
+        if (data.file.contains(RecordType.ADD, reader))
+        {
+            if (data.file.contains(RecordType.REMOVE, reader))
+                throw new IllegalArgumentException();
+
+            return new SSTableTidier(reader, true, this);
+        }
+
+        if (!data.file.add(RecordType.REMOVE, reader))
+            throw new IllegalStateException();
+
+        if (tracker != null)
+            tracker.notifyDeleting(reader);
+
+        return new SSTableTidier(reader, false, this);
+    }
+
+    OperationType getType()
+    {
+        return data.getType();
+    }
+
+    UUID getId()
+    {
+        return data.getId();
+    }
+
+    @VisibleForTesting
+    String getDataFolder()
+    {
+        return data.getFolder();
+    }
+
+    @VisibleForTesting
+    TransactionData getData()
+    {
+        return data;
+    }
+
+    private static void delete(File file)
+    {
+        try
+        {
+            if (logger.isDebugEnabled())
+                logger.debug("Deleting {}", file);
+
+            Files.delete(file.toPath());
+        }
+        catch (NoSuchFileException e)
+        {
+            logger.error("Unable to delete {} as it does not exist", file);
+        }
+        catch (IOException e)
+        {
+            logger.error("Unable to delete {}", file, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * The transaction tidier.
+     *
+     * When the transaction reference is fully released we try to delete all the obsolete files
+     * depending on the transaction result, as well as the transaction log file.
+     */
+    private static class TransactionTidier implements RefCounted.Tidy, Runnable
+    {
+        private final TransactionData data;
+
+        public TransactionTidier(TransactionData data)
+        {
+            this.data = data;
+        }
+
+        public void tidy() throws Exception
+        {
+            run();
+        }
+
+        public String name()
+        {
+            return data.toString();
+        }
+
+        public void run()
+        {
+            if (logger.isDebugEnabled())
+                logger.debug("Removing files for transaction {}", name());
+
+            assert data.completed() : "Expected a completed transaction: " + data;
+
+            Throwable err = data.removeUnfinishedLeftovers(null);
+
+            if (err != null)
+            {
+                logger.info("Failed deleting files for transaction {}, we'll retry after GC and on on server restart", name(), err);
+                failedDeletions.add(this);
+            }
+            else
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("Closing file transaction {}", name());
+                data.close();
+            }
+        }
+    }
+
+    static class Obsoletion
+    {
+        final SSTableReader reader;
+        final SSTableTidier tidier;
+
+        public Obsoletion(SSTableReader reader, SSTableTidier tidier)
+        {
+            this.reader = reader;
+            this.tidier = tidier;
+        }
+    }
+
+    /**
+     * The SSTableReader tidier. When a reader is fully released and no longer referenced
+     * by any one, we run this. It keeps a reference to the parent transaction and releases
+     * it when done, so that the final transaction cleanup can run when all obsolete readers
+     * are released.
+     */
+    public static class SSTableTidier implements Runnable
+    {
+        // must not retain a reference to the SSTableReader, else leak detection cannot kick in
+        private final Descriptor desc;
+        private final long sizeOnDisk;
+        private final Tracker tracker;
+        private final boolean wasNew;
+        private final Ref<TransactionLog> parentRef;
+
+        public SSTableTidier(SSTableReader referent, boolean wasNew, TransactionLog parent)
+        {
+            this.desc = referent.descriptor;
+            this.sizeOnDisk = referent.bytesOnDisk();
+            this.tracker = parent.tracker;
+            this.wasNew = wasNew;
+            this.parentRef = parent.selfRef.tryRef();
+        }
+
+        public void run()
+        {
+            SystemKeyspace.clearSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
+
+            try
+            {
+                // If we can't successfully delete the DATA component, set the task to be retried later: see TransactionTidier
+                File datafile = new File(desc.filenameFor(Component.DATA));
+
+                delete(datafile);
+                // let the remainder be cleaned up by delete
+                SSTable.delete(desc, SSTable.discoverComponentsFor(desc));
+            }
+            catch (Throwable t)
+            {
+                logger.error("Failed deletion for {}, we'll retry after GC and on server restart", desc);
+                failedDeletions.add(this);
+                return;
+            }
+
+            if (tracker != null && tracker.cfstore != null && !wasNew)
+                tracker.cfstore.metric.totalDiskSpaceUsed.dec(sizeOnDisk);
+
+            // release the referent to the parent so that the all transaction files can be released
+            parentRef.release();
+        }
+
+        public void abort()
+        {
+            parentRef.release();
+        }
+    }
+
+    /**
+     * Retry all deletions that failed the first time around (presumably b/c the sstable was still mmap'd.)
+     * Useful because there are times when we know GC has been invoked; also exposed as an mbean.
+     */
+    public static void rescheduleFailedDeletions()
+    {
+        Runnable task;
+        while ( null != (task = failedDeletions.poll()))
+            ScheduledExecutors.nonPeriodicTasks.submit(task);
+    }
+
+    /**
+     * Deletions run on the nonPeriodicTasks executor, (both failedDeletions or global tidiers in SSTableReader)
+     * so by scheduling a new empty task and waiting for it we ensure any prior deletion has completed.
+     */
+    public static void waitForDeletions()
+    {
+        FBUtilities.waitOnFuture(ScheduledExecutors.nonPeriodicTasks.schedule(Runnables.doNothing(), 0, TimeUnit.MILLISECONDS));
+    }
+
+    @VisibleForTesting
+    Throwable complete(Throwable accumulate)
+    {
+        try
+        {
+            accumulate = selfRef.ensureReleased(accumulate);
+            return accumulate;
+        }
+        catch (Throwable t)
+        {
+            logger.error("Failed to complete file transaction {}", getId(), t);
+            return Throwables.merge(accumulate, t);
+        }
+    }
+
+    protected Throwable doCommit(Throwable accumulate)
+    {
+        data.file.commit();
+        return complete(accumulate);
+    }
+
+    protected Throwable doAbort(Throwable accumulate)
+    {
+        data.file.abort();
+        return complete(accumulate);
+    }
+
+    protected void doPrepare() { }
+
+    /**
+     * Called on startup to scan existing folders for any unfinished leftovers of
+     * operations that were ongoing when the process exited. Also called by the standalone
+     * sstableutil tool when the cleanup option is specified, @see StandaloneSSTableUtil.
+     *
+     */
+    static void removeUnfinishedLeftovers(CFMetaData metadata)
+    {
+        Throwable accumulate = null;
+
+        for (File dir : new Directories(metadata).getCFDirectories())
+        {
+            File[] logs = dir.listFiles((dir1, name) -> TransactionData.isLogFile(name));
+
+            for (File log : logs)
+            {
+                try (TransactionData data = TransactionData.make(log))
+                {
+                    accumulate = data.readLogFile(accumulate);
+                    if (accumulate == null)
+                        accumulate = data.removeUnfinishedLeftovers(accumulate);
+                    else
+                        logger.error("Possible disk corruption: failed to read transaction log {}", log, accumulate);
+                }
+            }
+        }
+
+        if (accumulate != null)
+            logger.error("Failed to remove unfinished transaction leftovers", accumulate);
+    }
+
+    /**
+     * A class for listing files in a folder. If we fail we try a few more times
+     * in case we are reading txn log files that are still being mutated.
+     */
+    static final class FileLister
+    {
+        // The maximum number of attempts for scanning the folder
+        private static final int MAX_ATTEMPTS = 5;
+
+        // The delay between each attempt
+        private static final int REATTEMPT_DELAY_MILLIS = 5;
+
+        // The folder to scan
+        private final Path folder;
+
+        // The filter determines which files the client wants returned, we pass to the filter
+        // the file and its type
+        private final BiFunction<File, FileType, Boolean> filter;
+
+        // This determines the behavior when we fail to read a txn log file after a few times (MAX_ATTEMPTS)
+        private final OnTxnErr onTxnErr;
+
+        // Each time we scan the folder we increment this counter, we scan at most for MAX_ATTEMPTS
+        private int attempts;
+
+        public FileLister(Path folder, BiFunction<File, FileType, Boolean> filter, OnTxnErr onTxnErr)
+        {
+            this.folder = folder;
+            this.filter = filter;
+            this.onTxnErr = onTxnErr;
+            this.attempts = 0;
+        }
+
+        public List<File> list()
+        {
+            while(true)
+            {
+                try
+                {
+                    return attemptList();
+                }
+                catch (Throwable t)
+                {
+                    if (attempts >= MAX_ATTEMPTS)
+                        throw new RuntimeException(String.format("Failed to list files in %s after multiple attempts, giving up", folder), t);
+
+                    logger.warn("Failed to list files in {} : {}", folder, t.getMessage());
+                    try
+                    {
+                        Thread.sleep(REATTEMPT_DELAY_MILLIS);
+                    }
+                    catch (InterruptedException e)
+                    {
+                        logger.error("Interrupted whilst waiting to reattempt listing files in {}, giving up", folder, e);
+                        throw new RuntimeException(String.format("Failed to list files in %s due to interruption, giving up", folder), t);
+                    }
+                }
+            }
+        }
+
+        List<File> attemptList() throws IOException
+        {
+            attempts++;
+
+            Map<File, FileType> files = new HashMap<>();
+            try (DirectoryStream<Path> in = Files.newDirectoryStream(folder))
+            {
+                if (!(in instanceof SecureDirectoryStream))
+                    noSpamLogger.error("This platform does not support atomic directory streams (SecureDirectoryStream); " +
+                                       "race conditions when loading sstable files could occurr");
+
+                in.forEach(path ->
+                           {
+                               File file = path.toFile();
+                               if (file.isDirectory())
+                                   return;
+
+                               if (TransactionData.isLogFile(file.getName()))
+                               {
+                                   Set<File> tmpFiles = getTemporaryFiles(file);
+                                   if (tmpFiles != null)
+                                   { // process the txn log file only if we can read it (tmpFiles != null)
+                                       tmpFiles.stream().forEach((f) -> files.put(f, FileType.TEMPORARY));
+                                       files.put(file, FileType.TXN_LOG);
+                                   }
+                               }
+                               else
+                               {
+                                   files.putIfAbsent(file, FileType.FINAL);
+                               }
+                           });
+            }
+
+            return files.entrySet().stream()
+                        .filter((e) -> filter.apply(e.getKey(), e.getValue()))
+                        .map(Map.Entry::getKey)
+                        .collect(Collectors.toList());
+        }
+
+        Set<File> getTemporaryFiles(File file)
+        {
+            try (TransactionData txn = TransactionData.make(file))
+            {
+                maybeFail(txn.readLogFile(null));
+                return txn.getTemporaryFiles();
+            }
+            catch(Throwable t)
+            {
+                // We always fail if the onTxnErr is set to THROW or if we haven't
+                // reached the maximum number of attempts yet. Otherwise
+                // we just log an error and continue as if the txn log file does not exist
+                // clients can choose which behavior they want via onTxnLogError
+                if (attempts < MAX_ATTEMPTS ||
+                    onTxnErr == OnTxnErr.THROW)
+                    throw new RuntimeException(t);
+
+                logger.error("Failed to read temporary files of txn log {}", file, t);
+                return null; // txn.getTemporaryFiles() could be empty so we must use null to differentiate
+            }
+        }
+    }
+
+    @VisibleForTesting
+    static Set<File> getTemporaryFiles(CFMetaData metadata, File folder)
+    {
+        Set<File> ret = new HashSet<>();
+
+        List<File> directories = new Directories(metadata).getCFDirectories();
+        directories.add(folder);
+        for (File dir : directories)
+            ret.addAll(new FileLister(dir.toPath(),
+                                      (file, type) -> type != FileType.FINAL,
+                                      OnTxnErr.IGNORE).list());
+
+        return ret;
+    }
+}