You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2016/03/10 21:43:21 UTC
cassandra git commit: Abort startup and print txn log info when
corrupted
Repository: cassandra
Updated Branches:
refs/heads/trunk 42105ae5b -> 11910c6c9
Abort startup and print txn log info when corrupted
Patch by Stefania Alborghetti; reviewied by Tyler Hobbs for
CASSANDRA-10112
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/11910c6c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/11910c6c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/11910c6c
Branch: refs/heads/trunk
Commit: 11910c6c9206407c2de60f38566120bddde79eba
Parents: 42105ae
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Thu Mar 10 14:42:12 2016 -0600
Committer: Tyler Hobbs <ty...@gmail.com>
Committed: Thu Mar 10 14:43:01 2016 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 +
NEWS.txt | 5 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 10 ++-
.../db/lifecycle/LifecycleTransaction.java | 4 +-
.../apache/cassandra/db/lifecycle/LogFile.java | 78 ++++++++++++--------
.../cassandra/db/lifecycle/LogRecord.java | 13 ++--
.../cassandra/db/lifecycle/LogReplica.java | 76 +++++++++++++++----
.../cassandra/db/lifecycle/LogReplicaSet.java | 61 ++++++++++-----
.../cassandra/db/lifecycle/LogTransaction.java | 74 ++++++++++++-------
.../cassandra/exceptions/StartupException.java | 4 +
.../cassandra/service/CassandraDaemon.java | 11 ++-
.../apache/cassandra/service/StartupChecks.java | 28 ++++---
.../db/lifecycle/LogTransactionTest.java | 9 ++-
13 files changed, 256 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index aad9834..3682647 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
3.6
+ * Refuse to start and print txn log information in case of disk
+ corruption (CASSANDRA-10112)
* Resolve some eclipse-warnings (CASSANDRA-11086)
* (cqlsh) Show static columns in a different color (CASSANDRA-11059)
* Allow to remove TTLs on table with default_time_to_live (CASSANDRA-11207)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 74490a8..cc3e9c2 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,8 +18,11 @@ using the provided 'sstableupgrade' tool.
New features
------------
- - for tables having a default_time_to_live specifying a TTL of 0 will remove the TTL
+ - For tables having a default_time_to_live specifying a TTL of 0 will remove the TTL
from the inserted or updated values.
+ - Startup is now aborted if corrupted transaction log files are found. The details
+ of the affected log files are now logged, allowing the operator to decide how
+ to resolve the situation.
3.4
=====
http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 3b5e745..12a5f62 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -56,6 +56,7 @@ import org.apache.cassandra.db.rows.CellPath;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.StartupException;
import org.apache.cassandra.index.SecondaryIndexManager;
import org.apache.cassandra.index.internal.CassandraIndex;
import org.apache.cassandra.index.transactions.UpdateTransaction;
@@ -581,7 +582,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* Removes unnecessary files from the cf directory at startup: these include temp files, orphans, zero-length files
* and compacted sstables. Files that cannot be recognized will be ignored.
*/
- public static void scrubDataDirectories(CFMetaData metadata)
+ public static void scrubDataDirectories(CFMetaData metadata) throws StartupException
{
Directories directories = new Directories(metadata);
@@ -589,7 +590,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
clearEphemeralSnapshots(directories);
logger.trace("Removing temporary or obsoleted files from unfinished operations for table {}", metadata.cfName);
- LifecycleTransaction.removeUnfinishedLeftovers(metadata);
+ if (!LifecycleTransaction.removeUnfinishedLeftovers(metadata))
+ throw new StartupException(StartupException.ERR_WRONG_DISK_STATE,
+ String.format("Cannot remove temporary or obsoleted files for %s.%s due to a problem with transaction " +
+ "log files. Please check records with problems in the log messages above and fix them. " +
+ "Refer to the 3.0 upgrading instructions in NEWS.txt " +
+ "for a description of transaction log files.", metadata.ksName, metadata.cfName));
logger.trace("Further extra check for orphan sstable files for {}", metadata.cfName);
for (Map.Entry<Descriptor,Set<Component>> sstableFiles : directories.sstableLister(Directories.OnTxnErr.IGNORE).list().entrySet())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index a5eb01f..7ce4a08 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@ -522,9 +522,9 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
log.untrackNew(table);
}
- public static void removeUnfinishedLeftovers(CFMetaData metadata)
+ public static boolean removeUnfinishedLeftovers(CFMetaData metadata)
{
- LogTransaction.removeUnfinishedLeftovers(metadata);
+ return LogTransaction.removeUnfinishedLeftovers(metadata);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
index 9064e5f..3074842 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
@@ -73,9 +73,9 @@ final class LogFile
return new LogFile(operationType, id, logReplicas);
}
- Throwable syncFolder(Throwable accumulate)
+ Throwable syncDirectory(Throwable accumulate)
{
- return replicas.syncFolder(accumulate);
+ return replicas.syncDirectory(accumulate);
}
OperationType type()
@@ -94,9 +94,9 @@ final class LogFile
{
deleteFilesForRecordsOfType(committed() ? Type.REMOVE : Type.ADD);
- // we sync the parent folders between contents and log deletion
+ // we sync the parent directories between contents and log deletion
// to ensure there is a happens before edge between them
- Throwables.maybeFail(syncFolder(accumulate));
+ Throwables.maybeFail(syncDirectory(accumulate));
accumulate = replicas.delete(accumulate);
}
@@ -130,7 +130,7 @@ final class LogFile
records.clear();
if (!replicas.readRecords(records))
{
- logger.error("Failed to read records from {}", replicas);
+ logger.error("Failed to read records for transaction log {}", this);
return false;
}
@@ -143,7 +143,7 @@ final class LogFile
LogRecord failedOn = firstInvalid.get();
if (getLastRecord() != failedOn)
{
- logError(failedOn);
+ setErrorInReplicas(failedOn);
return false;
}
@@ -151,10 +151,10 @@ final class LogFile
if (records.stream()
.filter((r) -> r != failedOn)
.filter(LogRecord::isInvalid)
- .map(LogFile::logError)
+ .map(this::setErrorInReplicas)
.findFirst().isPresent())
{
- logError(failedOn);
+ setErrorInReplicas(failedOn);
return false;
}
@@ -167,9 +167,9 @@ final class LogFile
return true;
}
- static LogRecord logError(LogRecord record)
+ LogRecord setErrorInReplicas(LogRecord record)
{
- logger.error("{}", record.error());
+ replicas.setErrorInReplicas(record);
return record;
}
@@ -177,9 +177,8 @@ final class LogFile
{
if (record.checksum != record.computeChecksum())
{
- record.setError(String.format("Invalid checksum for sstable [%s], record [%s]: [%d] should have been [%d]",
+ record.setError(String.format("Invalid checksum for sstable [%s]: [%d] should have been [%d]",
record.fileName(),
- record,
record.checksum,
record.computeChecksum()));
return;
@@ -197,10 +196,9 @@ final class LogFile
record.status.onDiskRecord = record.withExistingFiles();
if (record.updateTime != record.status.onDiskRecord.updateTime && record.status.onDiskRecord.numFiles > 0)
{
- record.setError(String.format("Unexpected files detected for sstable [%s], " +
- "record [%s]: last update time [%tT] should have been [%tT]",
+ record.setError(String.format("Unexpected files detected for sstable [%s]: " +
+ "last update time [%tT] should have been [%tT]",
record.fileName(),
- record,
record.status.onDiskRecord.updateTime,
record.updateTime));
@@ -212,11 +210,9 @@ final class LogFile
if (record.type == Type.REMOVE && record.status.onDiskRecord.numFiles < record.numFiles)
{ // if we found a corruption in the last record, then we continue only
// if the number of files matches exactly for all previous records.
- record.setError(String.format("Incomplete fileset detected for sstable [%s], record [%s]: " +
- "number of files [%d] should have been [%d]. Treating as unrecoverable " +
- "due to corruption of the final record.",
+ record.setError(String.format("Incomplete fileset detected for sstable [%s]: " +
+ "number of files [%d] should have been [%d].",
record.fileName(),
- record.raw,
record.status.onDiskRecord.numFiles,
record.numFiles));
}
@@ -267,8 +263,9 @@ final class LogFile
{
assert type == Type.ADD || type == Type.REMOVE;
- File folder = table.descriptor.directory;
- replicas.maybeCreateReplica(folder, getFileName(folder), records);
+ File directory = table.descriptor.directory;
+ String fileName = StringUtils.join(directory, File.separator, getFileName());
+ replicas.maybeCreateReplica(directory, fileName, records);
return LogRecord.make(type, table);
}
@@ -351,7 +348,25 @@ final class LogFile
@Override
public String toString()
{
- return replicas.toString();
+ return toString(false);
+ }
+
+ public String toString(boolean showContents)
+ {
+ StringBuilder str = new StringBuilder();
+ str.append('[');
+ str.append(getFileName());
+ str.append(" in ");
+ str.append(replicas.getDirectories());
+ str.append(']');
+ if (showContents)
+ {
+ str.append(System.lineSeparator());
+ str.append("Files and contents follow:");
+ str.append(System.lineSeparator());
+ replicas.printContentsWithAnyErrors(str);
+ }
+ return str.toString();
}
@VisibleForTesting
@@ -366,16 +381,15 @@ final class LogFile
return replicas.getFilePaths();
}
- private String getFileName(File folder)
+ private String getFileName()
{
- String fileName = StringUtils.join(BigFormat.latestVersion,
- LogFile.SEP,
- "txn",
- LogFile.SEP,
- type.fileName,
- LogFile.SEP,
- id.toString(),
- LogFile.EXT);
- return StringUtils.join(folder, File.separator, fileName);
+ return StringUtils.join(BigFormat.latestVersion,
+ LogFile.SEP,
+ "txn",
+ LogFile.SEP,
+ type.fileName,
+ LogFile.SEP,
+ id.toString(),
+ LogFile.EXT);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
index 9e606fc..b2c7038 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
@@ -105,11 +105,13 @@ final class LogRecord
matcher.group(2),
Long.valueOf(matcher.group(3)),
Integer.valueOf(matcher.group(4)),
- Long.valueOf(matcher.group(5)), line);
+ Long.valueOf(matcher.group(5)),
+ line);
}
- catch (Throwable t)
+ catch (IllegalArgumentException e)
{
- return new LogRecord(Type.UNKNOWN, null, 0, 0, 0, line).setError(t);
+ return new LogRecord(Type.UNKNOWN, null, 0, 0, 0, line)
+ .setError(String.format("Failed to parse line: %s", e.getMessage()));
}
}
@@ -180,11 +182,6 @@ final class LogRecord
}
}
- LogRecord setError(Throwable t)
- {
- return setError(t.getMessage());
- }
-
LogRecord setError(String error)
{
status.setError(error);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java b/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java
index 79b9749..da90f88 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java
@@ -19,6 +19,9 @@
package org.apache.cassandra.db.lifecycle;
import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.CLibrary;
@@ -26,7 +29,7 @@ import org.apache.cassandra.utils.CLibrary;
/**
* Because a column family may have sstables on different disks and disks can
* be removed, we duplicate log files into many replicas so as to have a file
- * in each folder where sstables exist.
+ * in each directory where sstables exist.
*
* Each replica contains the exact same content but we do allow for final
* partial records in case we crashed after writing to one replica but
@@ -37,11 +40,12 @@ import org.apache.cassandra.utils.CLibrary;
final class LogReplica
{
private final File file;
- private int folderDescriptor;
+ private int directoryDescriptor;
+ private final Map<String, String> errors = new HashMap<>();
- static LogReplica create(File folder, String fileName)
+ static LogReplica create(File directory, String fileName)
{
- return new LogReplica(new File(fileName), CLibrary.tryOpenDirectory(folder.getPath()));
+ return new LogReplica(new File(fileName), CLibrary.tryOpenDirectory(directory.getPath()));
}
static LogReplica open(File file)
@@ -49,10 +53,10 @@ final class LogReplica
return new LogReplica(file, CLibrary.tryOpenDirectory(file.getParentFile().getPath()));
}
- LogReplica(File file, int folderDescriptor)
+ LogReplica(File file, int directoryDescriptor)
{
this.file = file;
- this.folderDescriptor = folderDescriptor;
+ this.directoryDescriptor = directoryDescriptor;
}
File file()
@@ -60,27 +64,42 @@ final class LogReplica
return file;
}
+ List<String> readLines()
+ {
+ return FileUtils.readLines(file);
+ }
+
+ String getFileName()
+ {
+ return file.getName();
+ }
+
+ String getDirectory()
+ {
+ return file.getParent();
+ }
+
void append(LogRecord record)
{
boolean existed = exists();
FileUtils.appendAndSync(file, record.toString());
// If the file did not exist before appending the first
- // line, then sync the folder as well since now it must exist
+ // line, then sync the directory as well since now it must exist
if (!existed)
- syncFolder();
+ syncDirectory();
}
- void syncFolder()
+ void syncDirectory()
{
- if (folderDescriptor >= 0)
- CLibrary.trySync(folderDescriptor);
+ if (directoryDescriptor >= 0)
+ CLibrary.trySync(directoryDescriptor);
}
void delete()
{
LogTransaction.delete(file);
- syncFolder();
+ syncDirectory();
}
boolean exists()
@@ -90,10 +109,10 @@ final class LogReplica
void close()
{
- if (folderDescriptor >= 0)
+ if (directoryDescriptor >= 0)
{
- CLibrary.tryCloseFD(folderDescriptor);
- folderDescriptor = -1;
+ CLibrary.tryCloseFD(directoryDescriptor);
+ directoryDescriptor = -1;
}
}
@@ -102,4 +121,31 @@ final class LogReplica
{
return String.format("[%s] ", file);
}
+
+ void setError(String line, String error)
+ {
+ errors.put(line, error);
+ }
+
+ void printContentsWithAnyErrors(StringBuilder str)
+ {
+ str.append(file.getPath());
+ str.append(System.lineSeparator());
+ FileUtils.readLines(file).forEach(line -> printLineWithAnyError(str, line));
+ }
+
+ private void printLineWithAnyError(StringBuilder str, String line)
+ {
+ str.append('\t');
+ str.append(line);
+ str.append(System.lineSeparator());
+
+ String error = errors.get(line);
+ if (error != null)
+ {
+ str.append("\t\t***");
+ str.append(error);
+ str.append(System.lineSeparator());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java b/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
index c557bf2..47a9901 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
@@ -32,7 +32,6 @@ import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.Throwables;
/**
@@ -60,31 +59,31 @@ public class LogReplicaSet
void addReplica(File file)
{
- File folder = file.getParentFile();
- assert !replicasByFile.containsKey(folder);
- replicasByFile.put(folder, LogReplica.open(file));
+ File directory = file.getParentFile();
+ assert !replicasByFile.containsKey(directory);
+ replicasByFile.put(directory, LogReplica.open(file));
if (logger.isTraceEnabled())
logger.trace("Added log file replica {} ", file);
}
- void maybeCreateReplica(File folder, String fileName, Set<LogRecord> records)
+ void maybeCreateReplica(File directory, String fileName, Set<LogRecord> records)
{
- if (replicasByFile.containsKey(folder))
+ if (replicasByFile.containsKey(directory))
return;
- final LogReplica replica = LogReplica.create(folder, fileName);
+ final LogReplica replica = LogReplica.create(directory, fileName);
records.forEach(replica::append);
- replicasByFile.put(folder, replica);
+ replicasByFile.put(directory, replica);
if (logger.isTraceEnabled())
logger.trace("Created new file replica {}", replica);
}
- Throwable syncFolder(Throwable accumulate)
+ Throwable syncDirectory(Throwable accumulate)
{
- return Throwables.perform(accumulate, replicas().stream().map(s -> s::syncFolder));
+ return Throwables.perform(accumulate, replicas().stream().map(s -> s::syncDirectory));
}
Throwable delete(Throwable accumulate)
@@ -101,15 +100,18 @@ public class LogReplicaSet
boolean readRecords(Set<LogRecord> records)
{
- Map<File, List<String>> linesByReplica = replicas().stream()
- .map(LogReplica::file)
- .collect(Collectors.toMap(Function.<File>identity(), FileUtils::readLines));
+ Map<LogReplica, List<String>> linesByReplica = replicas().stream()
+ .collect(Collectors.toMap(Function.<LogReplica>identity(),
+ LogReplica::readLines,
+ (k, v) -> {throw new IllegalStateException("Duplicated key: " + k);},
+ LinkedHashMap::new));
+
int maxNumLines = linesByReplica.values().stream().map(List::size).reduce(0, Integer::max);
for (int i = 0; i < maxNumLines; i++)
{
String firstLine = null;
boolean partial = false;
- for (Map.Entry<File, List<String>> entry : linesByReplica.entrySet())
+ for (Map.Entry<LogReplica, List<String>> entry : linesByReplica.entrySet())
{
List<String> currentLines = entry.getValue();
if (i >= currentLines.size())
@@ -125,9 +127,10 @@ public class LogReplicaSet
if (!isPrefixMatch(firstLine, currentLine))
{ // not a prefix match
logger.error("Mismatched line in file {}: got '{}' expected '{}', giving up",
- entry.getKey().getName(),
+ entry.getKey().getFileName(),
currentLine,
firstLine);
+ entry.getKey().setError(currentLine, String.format("Does not match <%s> in first replica file", firstLine));
return false;
}
@@ -136,7 +139,7 @@ public class LogReplicaSet
if (i == currentLines.size() - 1)
{ // last record, just set record as invalid and move on
logger.warn("Mismatched last line in file {}: '{}' not the same as '{}'",
- entry.getKey().getName(),
+ entry.getKey().getFileName(),
currentLine,
firstLine);
@@ -148,9 +151,10 @@ public class LogReplicaSet
else
{ // mismatched entry file has more lines, giving up
logger.error("Mismatched line in file {}: got '{}' expected '{}', giving up",
- entry.getKey().getName(),
+ entry.getKey().getFileName(),
currentLine,
firstLine);
+ entry.getKey().setError(currentLine, String.format("Does not match <%s> in first replica file", firstLine));
return false;
}
}
@@ -160,6 +164,7 @@ public class LogReplicaSet
if (records.contains(record))
{ // duplicate records
logger.error("Found duplicate record {} for {}, giving up", record, record.fileName());
+ setError(record, "Duplicated record");
return false;
}
@@ -171,6 +176,7 @@ public class LogReplicaSet
if (record.isFinal() && i != (maxNumLines - 1))
{ // too many final records
logger.error("Found too many lines for {}, giving up", record.fileName());
+ setError(record, "This record should have been the last one in all replicas");
return false;
}
}
@@ -178,6 +184,22 @@ public class LogReplicaSet
return true;
}
+ void setError(LogRecord record, String error)
+ {
+ record.setError(error);
+ setErrorInReplicas(record);
+ }
+
+ void setErrorInReplicas(LogRecord record)
+ {
+ replicas().forEach(r -> r.setError(record.raw, record.error()));
+ }
+
+ void printContentsWithAnyErrors(StringBuilder str)
+ {
+ replicas().forEach(r -> r.printContentsWithAnyErrors(str));
+ }
+
/**
* Add the record to all the replicas: if it is a final record then we throw only if we fail to write it
* to all, otherwise we throw if we fail to write it to any file, see CASSANDRA-10421 for details
@@ -216,6 +238,11 @@ public class LogReplicaSet
: "[-]";
}
+ String getDirectories()
+ {
+ return String.join(", ", replicas().stream().map(LogReplica::getDirectory).collect(Collectors.toList()));
+ }
+
@VisibleForTesting
List<File> getFiles()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
index ce76165..b441454 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
@@ -24,6 +24,7 @@ import java.nio.file.NoSuchFileException;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Runnables;
@@ -55,7 +56,7 @@ import org.apache.cassandra.utils.concurrent.Transactional;
* IMPORTANT: The transaction must complete (commit or abort) before any temporary files are deleted, even though the
* txn log file itself will not be deleted until all tracked files are deleted. This is required by FileLister to ensure
* a consistent disk state. LifecycleTransaction ensures this requirement, so this class should really never be used
- * outside of LT. @see FileLister.classifyFiles(TransactionData txn)
+ * outside of LT. @see LogAwareFileLister.classifyFiles()
*
* A class that tracks sstable files involved in a transaction across sstables:
* if the transaction succeeds the old files should be deleted and the new ones kept; vice-versa if it fails.
@@ -67,8 +68,7 @@ import org.apache.cassandra.utils.concurrent.Transactional;
*
* where sstable-2 is a new sstable to be retained if the transaction succeeds and sstable-1 is an old sstable to be
* removed. CRC is an incremental CRC of the file content up to this point. For old sstable files we also log the
- * last update time of all files for the sstable descriptor and a checksum of vital properties such as update times
- * and file sizes.
+ * last update time of all files for the sstable descriptor and the number of sstable files.
*
* Upon commit we add a final line to the log file:
*
@@ -238,27 +238,29 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran
public void run()
{
if (logger.isTraceEnabled())
- logger.trace("Removing files for transaction {}", name());
+ logger.trace("Removing files for transaction log {}", data);
if (!data.completed())
{ // this happens if we forget to close a txn and the garbage collector closes it for us
- logger.error("{} was not completed, trying to abort it now", data);
+ logger.error("Transaction log {} indicates txn was not completed, trying to abort it now", data);
Throwable err = Throwables.perform((Throwable)null, data::abort);
if (err != null)
- logger.error("Failed to abort {}", data, err);
+ logger.error("Failed to abort transaction log {}", data, err);
}
Throwable err = data.removeUnfinishedLeftovers(null);
if (err != null)
{
- logger.info("Failed deleting files for transaction {}, we'll retry after GC and on on server restart", name(), err);
+ logger.info("Failed deleting files for transaction log {}, we'll retry after GC and on on server restart",
+ data,
+ err);
failedDeletions.add(this);
}
else
{
if (logger.isTraceEnabled())
- logger.trace("Closing file transaction {}", name());
+ logger.trace("Closing transaction log {}", data);
data.close();
}
@@ -360,7 +362,7 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran
}
catch (Throwable t)
{
- logger.error("Failed to complete file transaction {}", id(), t);
+ logger.error("Failed to complete file transaction id {}", id(), t);
return Throwables.merge(accumulate, t);
}
}
@@ -378,31 +380,43 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran
protected void doPrepare() { }
/**
- * Called on startup to scan existing folders for any unfinished leftovers of
- * operations that were ongoing when the process exited. Also called by the standalone
- * sstableutil tool when the cleanup option is specified, @see StandaloneSSTableUtil.
+ * Removes any leftovers from unifinished transactions as indicated by any transaction log files that
+ * are found in the table directories. This means that any old sstable files for transactions that were committed,
+ * or any new sstable files for transactions that were aborted or still in progress, should be removed *if
+ * it is safe to do so*. Refer to the checks in LogFile.verify for further details on the safety checks
+ * before removing transaction leftovers and refer to the comments at the beginning of this file or in NEWS.txt
+ * for further details on transaction logs.
+ *
+ * This method is called on startup and by the standalone sstableutil tool when the cleanup option is specified,
+ * @see StandaloneSSTableUtil.
+ *
+ * @return true if the leftovers of all transaction logs found were removed, false otherwise.
*
*/
- static void removeUnfinishedLeftovers(CFMetaData metadata)
+ static boolean removeUnfinishedLeftovers(CFMetaData metadata)
{
- removeUnfinishedLeftovers(new Directories(metadata).getCFDirectories());
+ return removeUnfinishedLeftovers(new Directories(metadata).getCFDirectories());
}
@VisibleForTesting
- static void removeUnfinishedLeftovers(List<File> folders)
+ static boolean removeUnfinishedLeftovers(List<File> directories)
{
LogFilesByName logFiles = new LogFilesByName();
- folders.forEach(logFiles::list);
- logFiles.removeUnfinishedLeftovers();
+ directories.forEach(logFiles::list);
+ return logFiles.removeUnfinishedLeftovers();
}
private static final class LogFilesByName
{
+ // This maps a transaction log file name to a list of physical files. Each sstable
+ // can have multiple directories and a transaction is trakced by identical transaction log
+ // files, one per directory. So for each transaction file name we can have multiple
+ // physical files.
Map<String, List<File>> files = new HashMap<>();
- void list(File folder)
+ void list(File directory)
{
- Arrays.stream(folder.listFiles(LogFile::isLogFile)).forEach(this::add);
+ Arrays.stream(directory.listFiles(LogFile::isLogFile)).forEach(this::add);
}
void add(File file)
@@ -417,25 +431,35 @@ class LogTransaction extends Transactional.AbstractTransactional implements Tran
filesByName.add(file);
}
- void removeUnfinishedLeftovers()
+ boolean removeUnfinishedLeftovers()
{
- files.forEach(LogFilesByName::removeUnfinishedLeftovers);
+ return files.entrySet()
+ .stream()
+ .map(LogFilesByName::removeUnfinishedLeftovers)
+ .allMatch(Predicate.isEqual(true));
}
- static void removeUnfinishedLeftovers(String name, List<File> logFiles)
+ static boolean removeUnfinishedLeftovers(Map.Entry<String, List<File>> entry)
{
- LogFile txn = LogFile.make(name, logFiles);
+ LogFile txn = LogFile.make(entry.getKey(), entry.getValue());
try
{
if (txn.verify())
{
Throwable failure = txn.removeUnfinishedLeftovers(null);
if (failure != null)
- logger.error("Failed to remove unfinished transaction leftovers for txn {}", txn, failure);
+ {
+ logger.error("Failed to remove unfinished transaction leftovers for transaction log {}",
+ txn.toString(true), failure);
+ return false;
+ }
+
+ return true;
}
else
{
- logger.error("Unexpected disk state: failed to read transaction txn {}", txn);
+ logger.error("Unexpected disk state: failed to read transaction log {}", txn.toString(true));
+ return false;
}
}
finally
http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/src/java/org/apache/cassandra/exceptions/StartupException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/StartupException.java b/src/java/org/apache/cassandra/exceptions/StartupException.java
index ec4890f..1513cf9 100644
--- a/src/java/org/apache/cassandra/exceptions/StartupException.java
+++ b/src/java/org/apache/cassandra/exceptions/StartupException.java
@@ -23,6 +23,10 @@ package org.apache.cassandra.exceptions;
*/
public class StartupException extends Exception
{
+ public final static int ERR_WRONG_MACHINE_STATE = 1;
+ public final static int ERR_WRONG_DISK_STATE = 3;
+ public final static int ERR_WRONG_CONFIG = 100;
+
public final int returnCode;
public StartupException(int returnCode, String message)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 183abf8..b84a5e3 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -236,7 +236,16 @@ public class CassandraDaemon
continue;
for (CFMetaData cfm : Schema.instance.getTablesAndViews(keyspaceName))
- ColumnFamilyStore.scrubDataDirectories(cfm);
+ {
+ try
+ {
+ ColumnFamilyStore.scrubDataDirectories(cfm);
+ }
+ catch (StartupException e)
+ {
+ exitOrFail(e.returnCode, e.getMessage(), e.getCause());
+ }
+ }
}
Keyspace.setInitialized();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/src/java/org/apache/cassandra/service/StartupChecks.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java b/src/java/org/apache/cassandra/service/StartupChecks.java
index e903721..7c6c91a 100644
--- a/src/java/org/apache/cassandra/service/StartupChecks.java
+++ b/src/java/org/apache/cassandra/service/StartupChecks.java
@@ -134,8 +134,9 @@ public class StartupChecks
{
long now = System.currentTimeMillis();
if (now < EARLIEST_LAUNCH_DATE)
- throw new StartupException(1, String.format("current machine time is %s, but that is seemingly incorrect. exiting now.",
- new Date(now).toString()));
+ throw new StartupException(StartupException.ERR_WRONG_MACHINE_STATE,
+ String.format("current machine time is %s, but that is seemingly incorrect. exiting now.",
+ new Date(now).toString()));
}
};
@@ -186,7 +187,7 @@ public class StartupChecks
{
// Fail-fast if JNA is not available or failing to initialize properly
if (!CLibrary.jnaAvailable())
- throw new StartupException(3, "JNA failing to initialize properly. ");
+ throw new StartupException(StartupException.ERR_WRONG_MACHINE_STATE, "JNA failing to initialize properly. ");
}
};
@@ -216,12 +217,14 @@ public class StartupChecks
logger.warn("Directory {} doesn't exist", dataDir);
// if they don't, failing their creation, stop cassandra.
if (!dir.mkdirs())
- throw new StartupException(3, "Has no permission to create directory "+ dataDir);
+ throw new StartupException(StartupException.ERR_WRONG_DISK_STATE,
+ "Has no permission to create directory "+ dataDir);
}
// if directories exist verify their permissions
if (!Directories.verifyFullPermissions(dir, dataDir))
- throw new StartupException(3, "Insufficient permissions on directory " + dataDir);
+ throw new StartupException(StartupException.ERR_WRONG_DISK_STATE,
+ "Insufficient permissions on directory " + dataDir);
}
};
@@ -272,11 +275,12 @@ public class StartupChecks
}
if (!invalid.isEmpty())
- throw new StartupException(3, String.format("Detected unreadable sstables %s, please check " +
- "NEWS.txt and ensure that you have upgraded through " +
- "all required intermediate versions, running " +
- "upgradesstables",
- Joiner.on(",").join(invalid)));
+ throw new StartupException(StartupException.ERR_WRONG_DISK_STATE,
+ String.format("Detected unreadable sstables %s, please check " +
+ "NEWS.txt and ensure that you have upgraded through " +
+ "all required intermediate versions, running " +
+ "upgradesstables",
+ Joiner.on(",").join(invalid)));
}
};
@@ -318,7 +322,7 @@ public class StartupChecks
String formatMessage = "Cannot start node if snitch's data center (%s) differs from previous data center (%s). " +
"Please fix the snitch configuration, decommission and rebootstrap this node or use the flag -Dcassandra.ignore_dc=true.";
- throw new StartupException(100, String.format(formatMessage, currentDc, storedDc));
+ throw new StartupException(StartupException.ERR_WRONG_CONFIG, String.format(formatMessage, currentDc, storedDc));
}
}
}
@@ -340,7 +344,7 @@ public class StartupChecks
String formatMessage = "Cannot start node if snitch's rack (%s) differs from previous rack (%s). " +
"Please fix the snitch configuration, decommission and rebootstrap this node or use the flag -Dcassandra.ignore_rack=true.";
- throw new StartupException(100, String.format(formatMessage, currentRack, storedRack));
+ throw new StartupException(StartupException.ERR_WRONG_CONFIG, String.format(formatMessage, currentRack, storedRack));
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/11910c6c/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
index 4f2fc73..59958bb 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
@@ -519,7 +519,7 @@ public class LogTransactionTest extends AbstractTransactionalTest
LogAwareFileLister.getTemporaryFiles(dataFolder2));
// normally called at startup
- LogTransaction.removeUnfinishedLeftovers(Arrays.asList(dataFolder1, dataFolder2));
+ assertTrue(LogTransaction.removeUnfinishedLeftovers(Arrays.asList(dataFolder1, dataFolder2)));
// new tables should be only table left
assertFiles(dataFolder1.getPath(), new HashSet<>(sstables[1].getAllFilePaths()));
@@ -570,7 +570,7 @@ public class LogTransactionTest extends AbstractTransactionalTest
LogAwareFileLister.getTemporaryFiles(dataFolder2));
// normally called at startup
- LogTransaction.removeUnfinishedLeftovers(Arrays.asList(dataFolder1, dataFolder2));
+ assertTrue(LogTransaction.removeUnfinishedLeftovers(Arrays.asList(dataFolder1, dataFolder2)));
// old tables should be only table left
assertFiles(dataFolder1.getPath(), new HashSet<>(sstables[0].getAllFilePaths()));
@@ -735,7 +735,8 @@ public class LogTransactionTest extends AbstractTransactionalTest
Arrays.stream(sstables).forEach(s -> s.selfRef().release());
- LogTransaction.removeUnfinishedLeftovers(Arrays.asList(dataFolder1, dataFolder2));
+ // if shouldCommit is true then it should remove the leftovers and return true, false otherwise
+ assertEquals(shouldCommit, LogTransaction.removeUnfinishedLeftovers(Arrays.asList(dataFolder1, dataFolder2)));
LogTransaction.waitForDeletions();
if (shouldCommit)
@@ -862,7 +863,7 @@ public class LogTransactionTest extends AbstractTransactionalTest
if (filePath.endsWith("Data.db"))
{
assertTrue(FileUtils.delete(filePath));
- assertNull(t.txnFile().syncFolder(null));
+ assertNull(t.txnFile().syncDirectory(null));
break;
}
}