You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2016/03/10 21:43:21 UTC

cassandra git commit: Abort startup and print txn log info when corrupted

Repository: cassandra
Updated Branches:
  refs/heads/trunk 42105ae5b -> 11910c6c9


Abort startup and print txn log info when corrupted

Patch by Stefania Alborghetti; reviewied by Tyler Hobbs for
CASSANDRA-10112


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/11910c6c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/11910c6c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/11910c6c

Branch: refs/heads/trunk
Commit: 11910c6c9206407c2de60f38566120bddde79eba
Parents: 42105ae
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Thu Mar 10 14:42:12 2016 -0600
Committer: Tyler Hobbs <ty...@gmail.com>
Committed: Thu Mar 10 14:43:01 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 NEWS.txt                                        |  5 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  | 10 ++-
 .../db/lifecycle/LifecycleTransaction.java      |  4 +-
 .../apache/cassandra/db/lifecycle/LogFile.java  | 78 ++++++++++++--------
 .../cassandra/db/lifecycle/LogRecord.java       | 13 ++--
 .../cassandra/db/lifecycle/LogReplica.java      | 76 +++++++++++++++----
 .../cassandra/db/lifecycle/LogReplicaSet.java   | 61 ++++++++++-----
 .../cassandra/db/lifecycle/LogTransaction.java  | 74 ++++++++++++-------
 .../cassandra/exceptions/StartupException.java  |  4 +
 .../cassandra/service/CassandraDaemon.java      | 11 ++-
 .../apache/cassandra/service/StartupChecks.java | 28 ++++---
 .../db/lifecycle/LogTransactionTest.java        |  9 ++-
 13 files changed, 256 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index aad9834..3682647 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 3.6
+ * Refuse to start and print txn log information in case of disk
+   corruption (CASSANDRA-10112)
  * Resolve some eclipse-warnings (CASSANDRA-11086)
  * (cqlsh) Show static columns in a different color (CASSANDRA-11059)
  * Allow to remove TTLs on table with default_time_to_live (CASSANDRA-11207)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 74490a8..cc3e9c2 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,8 +18,11 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
-   - for tables having a default_time_to_live specifying a TTL of 0 will remove the TTL
+   - For tables having a default_time_to_live specifying a TTL of 0 will remove the TTL
      from the inserted or updated values.
+   - Startup is now aborted if corrupted transaction log files are found. The details
+     of the affected log files are now logged, allowing the operator to decide how
+     to resolve the situation.
 
 3.4
 =====

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 3b5e745..12a5f62 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -56,6 +56,7 @@ import org.apache.cassandra.db.rows.CellPath;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.StartupException;
 import org.apache.cassandra.index.SecondaryIndexManager;
 import org.apache.cassandra.index.internal.CassandraIndex;
 import org.apache.cassandra.index.transactions.UpdateTransaction;
@@ -581,7 +582,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      * Removes unnecessary files from the cf directory at startup: these include temp files, orphans, zero-length files
      * and compacted sstables. Files that cannot be recognized will be ignored.
      */
-    public static void scrubDataDirectories(CFMetaData metadata)
+    public static void scrubDataDirectories(CFMetaData metadata) throws StartupException
     {
         Directories directories = new Directories(metadata);
 
@@ -589,7 +590,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         clearEphemeralSnapshots(directories);
 
         logger.trace("Removing temporary or obsoleted files from unfinished operations for table {}", metadata.cfName);
-        LifecycleTransaction.removeUnfinishedLeftovers(metadata);
+        if (!LifecycleTransaction.removeUnfinishedLeftovers(metadata))
+            throw new StartupException(StartupException.ERR_WRONG_DISK_STATE,
+                                       String.format("Cannot remove temporary or obsoleted files for %s.%s due to a problem with transaction " +
+                                                     "log files. Please check records with problems in the log messages above and fix them. " +
+                                                     "Refer to the 3.0 upgrading instructions in NEWS.txt " +
+                                                     "for a description of transaction log files.", metadata.ksName, metadata.cfName));
 
         logger.trace("Further extra check for orphan sstable files for {}", metadata.cfName);
         for (Map.Entry<Descriptor,Set<Component>> sstableFiles : directories.sstableLister(Directories.OnTxnErr.IGNORE).list().entrySet())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index a5eb01f..7ce4a08 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@ -522,9 +522,9 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
         log.untrackNew(table);
     }
 
-    public static void removeUnfinishedLeftovers(CFMetaData metadata)
+    public static boolean removeUnfinishedLeftovers(CFMetaData metadata)
     {
-        LogTransaction.removeUnfinishedLeftovers(metadata);
+        return LogTransaction.removeUnfinishedLeftovers(metadata);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
index 9064e5f..3074842 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
@@ -73,9 +73,9 @@ final class LogFile
         return new LogFile(operationType, id, logReplicas);
     }
 
-    Throwable syncFolder(Throwable accumulate)
+    Throwable syncDirectory(Throwable accumulate)
     {
-        return replicas.syncFolder(accumulate);
+        return replicas.syncDirectory(accumulate);
     }
 
     OperationType type()
@@ -94,9 +94,9 @@ final class LogFile
         {
             deleteFilesForRecordsOfType(committed() ? Type.REMOVE : Type.ADD);
 
-            // we sync the parent folders between contents and log deletion
+            // we sync the parent directories between contents and log deletion
             // to ensure there is a happens before edge between them
-            Throwables.maybeFail(syncFolder(accumulate));
+            Throwables.maybeFail(syncDirectory(accumulate));
 
             accumulate = replicas.delete(accumulate);
         }
@@ -130,7 +130,7 @@ final class LogFile
         records.clear();
         if (!replicas.readRecords(records))
         {
-            logger.error("Failed to read records from {}", replicas);
+            logger.error("Failed to read records for transaction log {}", this);
             return false;
         }
 
@@ -143,7 +143,7 @@ final class LogFile
         LogRecord failedOn = firstInvalid.get();
         if (getLastRecord() != failedOn)
         {
-            logError(failedOn);
+            setErrorInReplicas(failedOn);
             return false;
         }
 
@@ -151,10 +151,10 @@ final class LogFile
         if (records.stream()
                    .filter((r) -> r != failedOn)
                    .filter(LogRecord::isInvalid)
-                   .map(LogFile::logError)
+                   .map(this::setErrorInReplicas)
                    .findFirst().isPresent())
         {
-            logError(failedOn);
+            setErrorInReplicas(failedOn);
             return false;
         }
 
@@ -167,9 +167,9 @@ final class LogFile
         return true;
     }
 
-    static LogRecord logError(LogRecord record)
+    LogRecord setErrorInReplicas(LogRecord record)
     {
-        logger.error("{}", record.error());
+        replicas.setErrorInReplicas(record);
         return record;
     }
 
@@ -177,9 +177,8 @@ final class LogFile
     {
         if (record.checksum != record.computeChecksum())
         {
-            record.setError(String.format("Invalid checksum for sstable [%s], record [%s]: [%d] should have been [%d]",
+            record.setError(String.format("Invalid checksum for sstable [%s]: [%d] should have been [%d]",
                                           record.fileName(),
-                                          record,
                                           record.checksum,
                                           record.computeChecksum()));
             return;
@@ -197,10 +196,9 @@ final class LogFile
         record.status.onDiskRecord = record.withExistingFiles();
         if (record.updateTime != record.status.onDiskRecord.updateTime && record.status.onDiskRecord.numFiles > 0)
         {
-            record.setError(String.format("Unexpected files detected for sstable [%s], " +
-                                          "record [%s]: last update time [%tT] should have been [%tT]",
+            record.setError(String.format("Unexpected files detected for sstable [%s]: " +
+                                          "last update time [%tT] should have been [%tT]",
                                           record.fileName(),
-                                          record,
                                           record.status.onDiskRecord.updateTime,
                                           record.updateTime));
 
@@ -212,11 +210,9 @@ final class LogFile
         if (record.type == Type.REMOVE && record.status.onDiskRecord.numFiles < record.numFiles)
         { // if we found a corruption in the last record, then we continue only
           // if the number of files matches exactly for all previous records.
-            record.setError(String.format("Incomplete fileset detected for sstable [%s], record [%s]: " +
-                                          "number of files [%d] should have been [%d]. Treating as unrecoverable " +
-                                          "due to corruption of the final record.",
+            record.setError(String.format("Incomplete fileset detected for sstable [%s]: " +
+                                          "number of files [%d] should have been [%d].",
                                           record.fileName(),
-                                          record.raw,
                                           record.status.onDiskRecord.numFiles,
                                           record.numFiles));
         }
@@ -267,8 +263,9 @@ final class LogFile
     {
         assert type == Type.ADD || type == Type.REMOVE;
 
-        File folder = table.descriptor.directory;
-        replicas.maybeCreateReplica(folder, getFileName(folder), records);
+        File directory = table.descriptor.directory;
+        String fileName = StringUtils.join(directory, File.separator, getFileName());
+        replicas.maybeCreateReplica(directory, fileName, records);
         return LogRecord.make(type, table);
     }
 
@@ -351,7 +348,25 @@ final class LogFile
     @Override
     public String toString()
     {
-        return replicas.toString();
+        return toString(false);
+    }
+
+    public String toString(boolean showContents)
+    {
+        StringBuilder str = new StringBuilder();
+        str.append('[');
+        str.append(getFileName());
+        str.append(" in ");
+        str.append(replicas.getDirectories());
+        str.append(']');
+        if (showContents)
+        {
+            str.append(System.lineSeparator());
+            str.append("Files and contents follow:");
+            str.append(System.lineSeparator());
+            replicas.printContentsWithAnyErrors(str);
+        }
+        return str.toString();
     }
 
     @VisibleForTesting
@@ -366,16 +381,15 @@ final class LogFile
         return replicas.getFilePaths();
     }
 
-    private String getFileName(File folder)
+    private String getFileName()
     {
-        String fileName = StringUtils.join(BigFormat.latestVersion,
-                                           LogFile.SEP,
-                                           "txn",
-                                           LogFile.SEP,
-                                           type.fileName,
-                                           LogFile.SEP,
-                                           id.toString(),
-                                           LogFile.EXT);
-        return StringUtils.join(folder, File.separator, fileName);
+        return StringUtils.join(BigFormat.latestVersion,
+                                LogFile.SEP,
+                                "txn",
+                                LogFile.SEP,
+                                type.fileName,
+                                LogFile.SEP,
+                                id.toString(),
+                                LogFile.EXT);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
index 9e606fc..b2c7038 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
@@ -105,11 +105,13 @@ final class LogRecord
                                  matcher.group(2),
                                  Long.valueOf(matcher.group(3)),
                                  Integer.valueOf(matcher.group(4)),
-                                 Long.valueOf(matcher.group(5)), line);
+                                 Long.valueOf(matcher.group(5)),
+                                 line);
         }
-        catch (Throwable t)
+        catch (IllegalArgumentException e)
         {
-            return new LogRecord(Type.UNKNOWN, null, 0, 0, 0, line).setError(t);
+            return new LogRecord(Type.UNKNOWN, null, 0, 0, 0, line)
+                   .setError(String.format("Failed to parse line: %s", e.getMessage()));
         }
     }
 
@@ -180,11 +182,6 @@ final class LogRecord
         }
     }
 
-    LogRecord setError(Throwable t)
-    {
-        return setError(t.getMessage());
-    }
-
     LogRecord setError(String error)
     {
         status.setError(error);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java b/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java
index 79b9749..da90f88 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java
@@ -19,6 +19,9 @@
 package org.apache.cassandra.db.lifecycle;
 
 import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.CLibrary;
@@ -26,7 +29,7 @@ import org.apache.cassandra.utils.CLibrary;
 /**
  * Because a column family may have sstables on different disks and disks can
  * be removed, we duplicate log files into many replicas so as to have a file
- * in each folder where sstables exist.
+ * in each directory where sstables exist.
  *
  * Each replica contains the exact same content but we do allow for final
  * partial records in case we crashed after writing to one replica but
@@ -37,11 +40,12 @@ import org.apache.cassandra.utils.CLibrary;
 final class LogReplica
 {
     private final File file;
-    private int folderDescriptor;
+    private int directoryDescriptor;
+    private final Map<String, String> errors = new HashMap<>();
 
-    static LogReplica create(File folder, String fileName)
+    static LogReplica create(File directory, String fileName)
     {
-        return new LogReplica(new File(fileName), CLibrary.tryOpenDirectory(folder.getPath()));
+        return new LogReplica(new File(fileName), CLibrary.tryOpenDirectory(directory.getPath()));
     }
 
     static LogReplica open(File file)
@@ -49,10 +53,10 @@ final class LogReplica
         return new LogReplica(file, CLibrary.tryOpenDirectory(file.getParentFile().getPath()));
     }
 
-    LogReplica(File file, int folderDescriptor)
+    LogReplica(File file, int directoryDescriptor)
     {
         this.file = file;
-        this.folderDescriptor = folderDescriptor;
+        this.directoryDescriptor = directoryDescriptor;
     }
 
     File file()
@@ -60,27 +64,42 @@ final class LogReplica
         return file;
     }
 
+    List<String> readLines()
+    {
+        return FileUtils.readLines(file);
+    }
+
+    String getFileName()
+    {
+        return file.getName();
+    }
+
+    String getDirectory()
+    {
+        return file.getParent();
+    }
+
     void append(LogRecord record)
     {
         boolean existed = exists();
         FileUtils.appendAndSync(file, record.toString());
 
         // If the file did not exist before appending the first
-        // line, then sync the folder as well since now it must exist
+        // line, then sync the directory as well since now it must exist
         if (!existed)
-            syncFolder();
+            syncDirectory();
     }
 
-    void syncFolder()
+    void syncDirectory()
     {
-        if (folderDescriptor >= 0)
-            CLibrary.trySync(folderDescriptor);
+        if (directoryDescriptor >= 0)
+            CLibrary.trySync(directoryDescriptor);
     }
 
     void delete()
     {
         LogTransaction.delete(file);
-        syncFolder();
+        syncDirectory();
     }
 
     boolean exists()
@@ -90,10 +109,10 @@ final class LogReplica
 
     void close()
     {
-        if (folderDescriptor >= 0)
+        if (directoryDescriptor >= 0)
         {
-            CLibrary.tryCloseFD(folderDescriptor);
-            folderDescriptor = -1;
+            CLibrary.tryCloseFD(directoryDescriptor);
+            directoryDescriptor = -1;
         }
     }
 
@@ -102,4 +121,31 @@ final class LogReplica
     {
         return String.format("[%s] ", file);
     }
+
+    void setError(String line, String error)
+    {
+        errors.put(line, error);
+    }
+
+    void printContentsWithAnyErrors(StringBuilder str)
+    {
+        str.append(file.getPath());
+        str.append(System.lineSeparator());
+        FileUtils.readLines(file).forEach(line -> printLineWithAnyError(str, line));
+    }
+
+    private void printLineWithAnyError(StringBuilder str, String line)
+    {
+        str.append('\t');
+        str.append(line);
+        str.append(System.lineSeparator());
+
+        String error = errors.get(line);
+        if (error != null)
+        {
+            str.append("\t\t***");
+            str.append(error);
+            str.append(System.lineSeparator());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java b/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
index c557bf2..47a9901 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
@@ -32,7 +32,6 @@ import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.Throwables;
 
 /**
@@ -60,31 +59,31 @@ public class LogReplicaSet
 
     void addReplica(File file)
     {
-        File folder = file.getParentFile();
-        assert !replicasByFile.containsKey(folder);
-        replicasByFile.put(folder, LogReplica.open(file));
+        File directory = file.getParentFile();
+        assert !replicasByFile.containsKey(directory);
+        replicasByFile.put(directory, LogReplica.open(file));
 
         if (logger.isTraceEnabled())
             logger.trace("Added log file replica {} ", file);
     }
 
-    void maybeCreateReplica(File folder, String fileName, Set<LogRecord> records)
+    void maybeCreateReplica(File directory, String fileName, Set<LogRecord> records)
     {
-        if (replicasByFile.containsKey(folder))
+        if (replicasByFile.containsKey(directory))
             return;
 
-        final LogReplica replica = LogReplica.create(folder, fileName);
+        final LogReplica replica = LogReplica.create(directory, fileName);
 
         records.forEach(replica::append);
-        replicasByFile.put(folder, replica);
+        replicasByFile.put(directory, replica);
 
         if (logger.isTraceEnabled())
             logger.trace("Created new file replica {}", replica);
     }
 
-    Throwable syncFolder(Throwable accumulate)
+    Throwable syncDirectory(Throwable accumulate)
     {
-        return Throwables.perform(accumulate, replicas().stream().map(s -> s::syncFolder));
+        return Throwables.perform(accumulate, replicas().stream().map(s -> s::syncDirectory));
     }
 
     Throwable delete(Throwable accumulate)
@@ -101,15 +100,18 @@ public class LogReplicaSet
 
     boolean readRecords(Set<LogRecord> records)
     {
-        Map<File, List<String>> linesByReplica = replicas().stream()
-                                                           .map(LogReplica::file)
-                                                           .collect(Collectors.toMap(Function.<File>identity(), FileUtils::readLines));
+        Map<LogReplica, List<String>> linesByReplica = replicas().stream()
+                                                                 .collect(Collectors.toMap(Function.<LogReplica>identity(),
+                                                                                           LogReplica::readLines,
+                                                                                           (k, v) -> {throw new IllegalStateException("Duplicated key: " + k);},
+                                                                                           LinkedHashMap::new));
+
         int maxNumLines = linesByReplica.values().stream().map(List::size).reduce(0, Integer::max);
         for (int i = 0; i < maxNumLines; i++)
         {
             String firstLine = null;
             boolean partial = false;
-            for (Map.Entry<File, List<String>> entry : linesByReplica.entrySet())
+            for (Map.Entry<LogReplica, List<String>> entry : linesByReplica.entrySet())
             {
                 List<String> currentLines = entry.getValue();
                 if (i >= currentLines.size())
@@ -125,9 +127,10 @@ public class LogReplicaSet
                 if (!isPrefixMatch(firstLine, currentLine))
                 { // not a prefix match
                     logger.error("Mismatched line in file {}: got '{}' expected '{}', giving up",
-                                 entry.getKey().getName(),
+                                 entry.getKey().getFileName(),
                                  currentLine,
                                  firstLine);
+                    entry.getKey().setError(currentLine, String.format("Does not match <%s> in first replica file", firstLine));
                     return false;
                 }
 
@@ -136,7 +139,7 @@ public class LogReplicaSet
                     if (i == currentLines.size() - 1)
                     { // last record, just set record as invalid and move on
                         logger.warn("Mismatched last line in file {}: '{}' not the same as '{}'",
-                                    entry.getKey().getName(),
+                                    entry.getKey().getFileName(),
                                     currentLine,
                                     firstLine);
 
@@ -148,9 +151,10 @@ public class LogReplicaSet
                     else
                     {   // mismatched entry file has more lines, giving up
                         logger.error("Mismatched line in file {}: got '{}' expected '{}', giving up",
-                                     entry.getKey().getName(),
+                                     entry.getKey().getFileName(),
                                      currentLine,
                                      firstLine);
+                        entry.getKey().setError(currentLine, String.format("Does not match <%s> in first replica file", firstLine));
                         return false;
                     }
                 }
@@ -160,6 +164,7 @@ public class LogReplicaSet
             if (records.contains(record))
             { // duplicate records
                 logger.error("Found duplicate record {} for {}, giving up", record, record.fileName());
+                setError(record, "Duplicated record");
                 return false;
             }
 
@@ -171,6 +176,7 @@ public class LogReplicaSet
             if (record.isFinal() && i != (maxNumLines - 1))
             { // too many final records
                 logger.error("Found too many lines for {}, giving up", record.fileName());
+                setError(record, "This record should have been the last one in all replicas");
                 return false;
             }
         }
@@ -178,6 +184,22 @@ public class LogReplicaSet
         return true;
     }
 
+    void setError(LogRecord record, String error)
+    {
+        record.setError(error);
+        setErrorInReplicas(record);
+    }
+
+    void setErrorInReplicas(LogRecord record)
+    {
+        replicas().forEach(r -> r.setError(record.raw, record.error()));
+    }
+
+    void printContentsWithAnyErrors(StringBuilder str)
+    {
+        replicas().forEach(r -> r.printContentsWithAnyErrors(str));
+    }
+
     /**
      *  Add the record to all the replicas: if it is a final record then we throw only if we fail to write it
      *  to all, otherwise we throw if we fail to write it to any file, see CASSANDRA-10421 for details
@@ -216,6 +238,11 @@ public class LogReplicaSet
                : "[-]";
     }
 
+    String getDirectories()
+    {
+        return String.join(", ", replicas().stream().map(LogReplica::getDirectory).collect(Collectors.toList()));
+    }
+
     @VisibleForTesting
     List<File> getFiles()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
index ce76165..b441454 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
@@ -24,6 +24,7 @@ import java.nio.file.NoSuchFileException;
 import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.Runnables;
@@ -55,7 +56,7 @@ import org.apache.cassandra.utils.concurrent.Transactional;
  * IMPORTANT: The transaction must complete (commit or abort) before any temporary files are deleted, even though the
  * txn log file itself will not be deleted until all tracked files are deleted. This is required by FileLister to ensure
  * a consistent disk state. LifecycleTransaction ensures this requirement, so this class should really never be used
- * outside of LT. @see FileLister.classifyFiles(TransactionData txn)
+ * outside of LT. @see LogAwareFileLister.classifyFiles()
  *
  * A class that tracks sstable files involved in a transaction across sstables:
  * if the transaction succeeds the old files should be deleted and the new ones kept; vice-versa if it fails.
@@ -67,8 +68,7 @@ import org.apache.cassandra.utils.concurrent.Transactional;
  *
  * where sstable-2 is a new sstable to be retained if the transaction succeeds and sstable-1 is an old sstable to be
  * removed. CRC is an incremental CRC of the file content up to this point. For old sstable files we also log the
- * last update time of all files for the sstable descriptor and a checksum of vital properties such as update times
- * and file sizes.
+ * last update time of all files for the sstable descriptor and the number of sstable files.
  *
  * Upon commit we add a final line to the log file:
  *
@@ -238,27 +238,29 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran
         public void run()
         {
             if (logger.isTraceEnabled())
-                logger.trace("Removing files for transaction {}", name());
+                logger.trace("Removing files for transaction log {}", data);
 
             if (!data.completed())
             { // this happens if we forget to close a txn and the garbage collector closes it for us
-                logger.error("{} was not completed, trying to abort it now", data);
+                logger.error("Transaction log {} indicates txn was not completed, trying to abort it now", data);
                 Throwable err = Throwables.perform((Throwable)null, data::abort);
                 if (err != null)
-                    logger.error("Failed to abort {}", data, err);
+                    logger.error("Failed to abort transaction log {}", data, err);
             }
 
             Throwable err = data.removeUnfinishedLeftovers(null);
 
             if (err != null)
             {
-                logger.info("Failed deleting files for transaction {}, we'll retry after GC and on on server restart", name(), err);
+                logger.info("Failed deleting files for transaction log {}, we'll retry after GC and on on server restart",
+                            data,
+                            err);
                 failedDeletions.add(this);
             }
             else
             {
                 if (logger.isTraceEnabled())
-                    logger.trace("Closing file transaction {}", name());
+                    logger.trace("Closing transaction log {}", data);
 
                 data.close();
             }
@@ -360,7 +362,7 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran
         }
         catch (Throwable t)
         {
-            logger.error("Failed to complete file transaction {}", id(), t);
+            logger.error("Failed to complete file transaction id {}", id(), t);
             return Throwables.merge(accumulate, t);
         }
     }
@@ -378,31 +380,43 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran
     protected void doPrepare() { }
 
     /**
-     * Called on startup to scan existing folders for any unfinished leftovers of
-     * operations that were ongoing when the process exited. Also called by the standalone
-     * sstableutil tool when the cleanup option is specified, @see StandaloneSSTableUtil.
+     * Removes any leftovers from unifinished transactions as indicated by any transaction log files that
+     * are found in the table directories. This means that any old sstable files for transactions that were committed,
+     * or any new sstable files for transactions that were aborted or still in progress, should be removed *if
+     * it is safe to do so*. Refer to the checks in LogFile.verify for further details on the safety checks
+     * before removing transaction leftovers and refer to the comments at the beginning of this file or in NEWS.txt
+     * for further details on transaction logs.
+     *
+     * This method is called on startup and by the standalone sstableutil tool when the cleanup option is specified,
+     * @see StandaloneSSTableUtil.
+     *
+     * @return true if the leftovers of all transaction logs found were removed, false otherwise.
      *
      */
-    static void removeUnfinishedLeftovers(CFMetaData metadata)
+    static boolean removeUnfinishedLeftovers(CFMetaData metadata)
     {
-        removeUnfinishedLeftovers(new Directories(metadata).getCFDirectories());
+        return removeUnfinishedLeftovers(new Directories(metadata).getCFDirectories());
     }
 
     @VisibleForTesting
-    static void removeUnfinishedLeftovers(List<File> folders)
+    static boolean removeUnfinishedLeftovers(List<File> directories)
     {
         LogFilesByName logFiles = new LogFilesByName();
-        folders.forEach(logFiles::list);
-        logFiles.removeUnfinishedLeftovers();
+        directories.forEach(logFiles::list);
+        return logFiles.removeUnfinishedLeftovers();
     }
 
     private static final class LogFilesByName
     {
+        // This maps a transaction log file name to a list of physical files. Each sstable
+        // can have multiple directories and a transaction is trakced by identical transaction log
+        // files, one per directory. So for each transaction file name we can have multiple
+        // physical files.
         Map<String, List<File>> files = new HashMap<>();
 
-        void list(File folder)
+        void list(File directory)
         {
-            Arrays.stream(folder.listFiles(LogFile::isLogFile)).forEach(this::add);
+            Arrays.stream(directory.listFiles(LogFile::isLogFile)).forEach(this::add);
         }
 
         void add(File file)
@@ -417,25 +431,35 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran
             filesByName.add(file);
         }
 
-        void removeUnfinishedLeftovers()
+        boolean removeUnfinishedLeftovers()
         {
-            files.forEach(LogFilesByName::removeUnfinishedLeftovers);
+            return files.entrySet()
+                        .stream()
+                        .map(LogFilesByName::removeUnfinishedLeftovers)
+                        .allMatch(Predicate.isEqual(true));
         }
 
-        static void removeUnfinishedLeftovers(String name, List<File> logFiles)
+        static boolean removeUnfinishedLeftovers(Map.Entry<String, List<File>> entry)
         {
-            LogFile txn = LogFile.make(name, logFiles);
+            LogFile txn = LogFile.make(entry.getKey(), entry.getValue());
             try
             {
                 if (txn.verify())
                 {
                     Throwable failure = txn.removeUnfinishedLeftovers(null);
                     if (failure != null)
-                        logger.error("Failed to remove unfinished transaction leftovers for txn {}", txn, failure);
+                    {
+                        logger.error("Failed to remove unfinished transaction leftovers for transaction log {}",
+                                     txn.toString(true), failure);
+                        return false;
+                    }
+
+                    return true;
                 }
                 else
                 {
-                    logger.error("Unexpected disk state: failed to read transaction txn {}", txn);
+                    logger.error("Unexpected disk state: failed to read transaction log {}", txn.toString(true));
+                    return false;
                 }
             }
             finally

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/src/java/org/apache/cassandra/exceptions/StartupException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/StartupException.java b/src/java/org/apache/cassandra/exceptions/StartupException.java
index ec4890f..1513cf9 100644
--- a/src/java/org/apache/cassandra/exceptions/StartupException.java
+++ b/src/java/org/apache/cassandra/exceptions/StartupException.java
@@ -23,6 +23,10 @@ package org.apache.cassandra.exceptions;
  */
 public class StartupException extends Exception
 {
+    public final static int ERR_WRONG_MACHINE_STATE = 1;
+    public final static int ERR_WRONG_DISK_STATE = 3;
+    public final static int ERR_WRONG_CONFIG = 100;
+
     public final int returnCode;
 
     public StartupException(int returnCode, String message)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 183abf8..b84a5e3 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -236,7 +236,16 @@ public class CassandraDaemon
                 continue;
 
             for (CFMetaData cfm : Schema.instance.getTablesAndViews(keyspaceName))
-                ColumnFamilyStore.scrubDataDirectories(cfm);
+            {
+                try
+                {
+                    ColumnFamilyStore.scrubDataDirectories(cfm);
+                }
+                catch (StartupException e)
+                {
+                    exitOrFail(e.returnCode, e.getMessage(), e.getCause());
+                }
+            }
         }
 
         Keyspace.setInitialized();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/src/java/org/apache/cassandra/service/StartupChecks.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java b/src/java/org/apache/cassandra/service/StartupChecks.java
index e903721..7c6c91a 100644
--- a/src/java/org/apache/cassandra/service/StartupChecks.java
+++ b/src/java/org/apache/cassandra/service/StartupChecks.java
@@ -134,8 +134,9 @@ public class StartupChecks
         {
             long now = System.currentTimeMillis();
             if (now < EARLIEST_LAUNCH_DATE)
-                throw new StartupException(1, String.format("current machine time is %s, but that is seemingly incorrect. exiting now.",
-                                                            new Date(now).toString()));
+                throw new StartupException(StartupException.ERR_WRONG_MACHINE_STATE,
+                                           String.format("current machine time is %s, but that is seemingly incorrect. exiting now.",
+                                                         new Date(now).toString()));
         }
     };
 
@@ -186,7 +187,7 @@ public class StartupChecks
         {
             // Fail-fast if JNA is not available or failing to initialize properly
             if (!CLibrary.jnaAvailable())
-                throw new StartupException(3, "JNA failing to initialize properly. ");
+                throw new StartupException(StartupException.ERR_WRONG_MACHINE_STATE, "JNA failing to initialize properly. ");
         }
     };
 
@@ -216,12 +217,14 @@ public class StartupChecks
                 logger.warn("Directory {} doesn't exist", dataDir);
                 // if they don't, failing their creation, stop cassandra.
                 if (!dir.mkdirs())
-                    throw new StartupException(3, "Has no permission to create directory "+ dataDir);
+                    throw new StartupException(StartupException.ERR_WRONG_DISK_STATE,
+                                               "Has no permission to create directory "+ dataDir);
             }
 
             // if directories exist verify their permissions
             if (!Directories.verifyFullPermissions(dir, dataDir))
-                throw new StartupException(3, "Insufficient permissions on directory " + dataDir);
+                throw new StartupException(StartupException.ERR_WRONG_DISK_STATE,
+                                           "Insufficient permissions on directory " + dataDir);
         }
     };
 
@@ -272,11 +275,12 @@ public class StartupChecks
             }
 
             if (!invalid.isEmpty())
-                throw new StartupException(3, String.format("Detected unreadable sstables %s, please check " +
-                                                            "NEWS.txt and ensure that you have upgraded through " +
-                                                            "all required intermediate versions, running " +
-                                                            "upgradesstables",
-                                                            Joiner.on(",").join(invalid)));
+                throw new StartupException(StartupException.ERR_WRONG_DISK_STATE,
+                                           String.format("Detected unreadable sstables %s, please check " +
+                                                         "NEWS.txt and ensure that you have upgraded through " +
+                                                         "all required intermediate versions, running " +
+                                                         "upgradesstables",
+                                                         Joiner.on(",").join(invalid)));
 
         }
     };
@@ -318,7 +322,7 @@ public class StartupChecks
                         String formatMessage = "Cannot start node if snitch's data center (%s) differs from previous data center (%s). " +
                                                "Please fix the snitch configuration, decommission and rebootstrap this node or use the flag -Dcassandra.ignore_dc=true.";
 
-                        throw new StartupException(100, String.format(formatMessage, currentDc, storedDc));
+                        throw new StartupException(StartupException.ERR_WRONG_CONFIG, String.format(formatMessage, currentDc, storedDc));
                     }
                 }
             }
@@ -340,7 +344,7 @@ public class StartupChecks
                         String formatMessage = "Cannot start node if snitch's rack (%s) differs from previous rack (%s). " +
                                                "Please fix the snitch configuration, decommission and rebootstrap this node or use the flag -Dcassandra.ignore_rack=true.";
 
-                        throw new StartupException(100, String.format(formatMessage, currentRack, storedRack));
+                        throw new StartupException(StartupException.ERR_WRONG_CONFIG, String.format(formatMessage, currentRack, storedRack));
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
index 4f2fc73..59958bb 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
@@ -519,7 +519,7 @@ public class LogTransactionTest extends AbstractTransactionalTest
                             LogAwareFileLister.getTemporaryFiles(dataFolder2));
 
         // normally called at startup
-        LogTransaction.removeUnfinishedLeftovers(Arrays.asList(dataFolder1, dataFolder2));
+        assertTrue(LogTransaction.removeUnfinishedLeftovers(Arrays.asList(dataFolder1, dataFolder2)));
 
         // new tables should be only table left
         assertFiles(dataFolder1.getPath(), new HashSet<>(sstables[1].getAllFilePaths()));
@@ -570,7 +570,7 @@ public class LogTransactionTest extends AbstractTransactionalTest
                             LogAwareFileLister.getTemporaryFiles(dataFolder2));
 
         // normally called at startup
-        LogTransaction.removeUnfinishedLeftovers(Arrays.asList(dataFolder1, dataFolder2));
+        assertTrue(LogTransaction.removeUnfinishedLeftovers(Arrays.asList(dataFolder1, dataFolder2)));
 
         // old tables should be only table left
         assertFiles(dataFolder1.getPath(), new HashSet<>(sstables[0].getAllFilePaths()));
@@ -735,7 +735,8 @@ public class LogTransactionTest extends AbstractTransactionalTest
 
         Arrays.stream(sstables).forEach(s -> s.selfRef().release());
 
-        LogTransaction.removeUnfinishedLeftovers(Arrays.asList(dataFolder1, dataFolder2));
+        // if shouldCommit is true then it should remove the leftovers and return true, false otherwise
+        assertEquals(shouldCommit, LogTransaction.removeUnfinishedLeftovers(Arrays.asList(dataFolder1, dataFolder2)));
         LogTransaction.waitForDeletions();
 
         if (shouldCommit)
@@ -862,7 +863,7 @@ public class LogTransactionTest extends AbstractTransactionalTest
                                   if (filePath.endsWith("Data.db"))
                                   {
                                       assertTrue(FileUtils.delete(filePath));
-                                      assertNull(t.txnFile().syncFolder(null));
+                                      assertNull(t.txnFile().syncDirectory(null));
                                       break;
                                   }
                               }