You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/09/08 12:56:56 UTC
[3/7] cassandra git commit: Handle non-atomic directory streams
safely (CASSANDRA-10109)
Handle non-atomic directory streams safely (CASSANDRA-10109)
This patch refactors the lifecycle transaction log and updates
the logic to be robust to non-atomic listings of directories
patch by stefania; reviewed by benedict for CASSANDRA-10109
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/351c7cac
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/351c7cac
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/351c7cac
Branch: refs/heads/cassandra-3.0
Commit: 351c7caca311834f6c5bff08b0204943850214a9
Parents: 3818d30
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Thu Aug 27 14:09:45 2015 +0800
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Tue Sep 8 11:53:22 2015 +0100
----------------------------------------------------------------------
.../apache/cassandra/db/ColumnFamilyStore.java | 2 +-
.../org/apache/cassandra/db/Directories.java | 5 +-
.../apache/cassandra/db/lifecycle/Helpers.java | 12 +-
.../db/lifecycle/LifecycleTransaction.java | 64 +-
.../db/lifecycle/LogAwareFileLister.java | 196 +++
.../apache/cassandra/db/lifecycle/LogFile.java | 364 ++++++
.../cassandra/db/lifecycle/LogRecord.java | 208 ++++
.../cassandra/db/lifecycle/LogTransaction.java | 418 +++++++
.../apache/cassandra/db/lifecycle/Tracker.java | 4 +-
.../cassandra/db/lifecycle/TransactionLog.java | 1141 ------------------
.../apache/cassandra/io/sstable/SSTable.java | 1 -
.../io/sstable/format/SSTableReader.java | 5 +-
.../org/apache/cassandra/io/util/FileUtils.java | 10 +-
.../apache/cassandra/service/GCInspector.java | 4 +-
.../cassandra/service/StorageService.java | 4 +-
.../cassandra/tools/StandaloneScrubber.java | 3 +-
.../cassandra/tools/StandaloneSplitter.java | 3 +-
.../cassandra/tools/StandaloneUpgrader.java | 3 +-
.../org/apache/cassandra/utils/CLibrary.java | 2 +-
.../org/apache/cassandra/db/KeyCacheTest.java | 5 +-
.../unit/org/apache/cassandra/db/ScrubTest.java | 3 +-
.../cassandra/db/lifecycle/HelpersTest.java | 4 +-
.../db/lifecycle/LifecycleTransactionTest.java | 2 +-
.../db/lifecycle/LogTransactionTest.java | 823 +++++++++++++
.../db/lifecycle/RealTransactionsTest.java | 7 +-
.../cassandra/db/lifecycle/TrackerTest.java | 7 +-
.../db/lifecycle/TransactionLogTest.java | 812 -------------
.../io/sstable/SSTableRewriterTest.java | 35 +-
.../org/apache/cassandra/schema/DefsTest.java | 4 +-
29 files changed, 2109 insertions(+), 2042 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351c7cac/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 096172d..979e8ba 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -510,7 +510,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
SystemKeyspace.removeTruncationRecord(metadata.cfId);
data.dropSSTables();
- TransactionLog.waitForDeletions();
+ LifecycleTransaction.waitForDeletions();
indexManager.invalidateAllIndexesBlocking();
materializedViewManager.invalidate();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351c7cac/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index c17b1fd..c801952 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -32,6 +32,7 @@ import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
+import java.util.function.Consumer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
@@ -550,7 +551,7 @@ public class Directories
TEMPORARY,
/** A transaction log file (contains information on final and temporary files). */
- TXN_LOG
+ TXN_LOG;
}
/**
@@ -562,7 +563,7 @@ public class Directories
/** Throw the exception */
THROW,
- /** Ignore the txn log file */
+ /** Ignore the problematic parts of the txn log file */
IGNORE
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351c7cac/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
index 98983c5..f9555f4 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
@@ -106,12 +106,12 @@ class Helpers
assert !reader.isReplaced();
}
- static Throwable markObsolete(List<TransactionLog.Obsoletion> obsoletions, Throwable accumulate)
+ static Throwable markObsolete(List<LogTransaction.Obsoletion> obsoletions, Throwable accumulate)
{
if (obsoletions == null || obsoletions.isEmpty())
return accumulate;
- for (TransactionLog.Obsoletion obsoletion : obsoletions)
+ for (LogTransaction.Obsoletion obsoletion : obsoletions)
{
try
{
@@ -125,13 +125,13 @@ class Helpers
return accumulate;
}
- static Throwable prepareForObsoletion(Iterable<SSTableReader> readers, TransactionLog txnLogs, List<TransactionLog.Obsoletion> obsoletions, Throwable accumulate)
+ static Throwable prepareForObsoletion(Iterable<SSTableReader> readers, LogTransaction txnLogs, List<LogTransaction.Obsoletion> obsoletions, Throwable accumulate)
{
for (SSTableReader reader : readers)
{
try
{
- obsoletions.add(new TransactionLog.Obsoletion(reader, txnLogs.obsoleted(reader)));
+ obsoletions.add(new LogTransaction.Obsoletion(reader, txnLogs.obsoleted(reader)));
}
catch (Throwable t)
{
@@ -141,12 +141,12 @@ class Helpers
return accumulate;
}
- static Throwable abortObsoletion(List<TransactionLog.Obsoletion> obsoletions, Throwable accumulate)
+ static Throwable abortObsoletion(List<LogTransaction.Obsoletion> obsoletions, Throwable accumulate)
{
if (obsoletions == null || obsoletions.isEmpty())
return accumulate;
- for (TransactionLog.Obsoletion obsoletion : obsoletions)
+ for (LogTransaction.Obsoletion obsoletion : obsoletions)
{
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351c7cac/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index 520b229..59bbc7d 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@ -98,7 +98,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
public final Tracker tracker;
// The transaction logs keep track of new and old sstable files
- private final TransactionLog transactionLog;
+ private final LogTransaction log;
// the original readers this transaction was opened over, and that it guards
// (no other transactions may operate over these readers concurrently)
private final Set<SSTableReader> originals = new HashSet<>();
@@ -115,7 +115,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
private final State staged = new State();
// the tidier and their readers, to be used for marking readers obsoleted during a commit
- private List<TransactionLog.Obsoletion> obsoletions;
+ private List<LogTransaction.Obsoletion> obsoletions;
/**
* construct a Transaction for use in an offline operation
@@ -143,7 +143,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
public static LifecycleTransaction offline(OperationType operationType, CFMetaData metadata)
{
Tracker dummy = new Tracker(null, false);
- return new LifecycleTransaction(dummy, new TransactionLog(operationType, metadata, dummy), Collections.emptyList());
+ return new LifecycleTransaction(dummy, new LogTransaction(operationType, metadata, dummy), Collections.emptyList());
}
/**
@@ -152,18 +152,18 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
public static LifecycleTransaction offline(OperationType operationType, File operationFolder)
{
Tracker dummy = new Tracker(null, false);
- return new LifecycleTransaction(dummy, new TransactionLog(operationType, operationFolder, dummy), Collections.emptyList());
+ return new LifecycleTransaction(dummy, new LogTransaction(operationType, operationFolder, dummy), Collections.emptyList());
}
LifecycleTransaction(Tracker tracker, OperationType operationType, Iterable<SSTableReader> readers)
{
- this(tracker, new TransactionLog(operationType, getMetadata(tracker, readers), tracker), readers);
+ this(tracker, new LogTransaction(operationType, getMetadata(tracker, readers), tracker), readers);
}
- LifecycleTransaction(Tracker tracker, TransactionLog transactionLog, Iterable<SSTableReader> readers)
+ LifecycleTransaction(Tracker tracker, LogTransaction log, Iterable<SSTableReader> readers)
{
this.tracker = tracker;
- this.transactionLog = transactionLog;
+ this.log = log;
for (SSTableReader reader : readers)
{
originals.add(reader);
@@ -187,19 +187,19 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
return null;
}
- public TransactionLog log()
+ public LogTransaction log()
{
- return transactionLog;
+ return log;
}
public OperationType opType()
{
- return transactionLog.getType();
+ return log.getType();
}
public UUID opId()
{
- return transactionLog.getId();
+ return log.getId();
}
public void doPrepare()
@@ -212,8 +212,8 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
// prepare for compaction obsolete readers as long as they were part of the original set
// since those that are not original are early readers that share the same desc with the finals
- maybeFail(prepareForObsoletion(filterIn(logged.obsolete, originals), transactionLog, obsoletions = new ArrayList<>(), null));
- transactionLog.prepareToCommit();
+ maybeFail(prepareForObsoletion(filterIn(logged.obsolete, originals), log, obsoletions = new ArrayList<>(), null));
+ log.prepareToCommit();
}
/**
@@ -228,7 +228,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
maybeFail(accumulate);
// transaction log commit failure means we must abort; safe commit is not possible
- maybeFail(transactionLog.commit(null));
+ maybeFail(log.commit(null));
// this is now the point of no return; we cannot safely rollback, so we ignore exceptions until we're done
// we restore state by obsoleting our obsolete files, releasing our references to them, and updating our size
@@ -237,7 +237,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
accumulate = markObsolete(obsoletions, accumulate);
accumulate = tracker.updateSizeTracking(logged.obsolete, logged.update, accumulate);
accumulate = release(selfRefs(logged.obsolete), accumulate);
- accumulate = tracker.notifySSTablesChanged(originals, logged.update, transactionLog.getType(), accumulate);
+ accumulate = tracker.notifySSTablesChanged(originals, logged.update, log.getType(), accumulate);
return accumulate;
}
@@ -253,16 +253,16 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
accumulate = abortObsoletion(obsoletions, accumulate);
if (logged.isEmpty() && staged.isEmpty())
- return transactionLog.abort(accumulate);
+ return log.abort(accumulate);
// mark obsolete all readers that are not versions of those present in the original set
Iterable<SSTableReader> obsolete = filterOut(concatUniq(staged.update, logged.update), originals);
logger.debug("Obsoleting {}", obsolete);
- accumulate = prepareForObsoletion(obsolete, transactionLog, obsoletions = new ArrayList<>(), accumulate);
+ accumulate = prepareForObsoletion(obsolete, log, obsoletions = new ArrayList<>(), accumulate);
// it's safe to abort even if committed, see maybeFail in doCommit() above, in this case it will just report
// a failure to abort, which is useful information to have for debug
- accumulate = transactionLog.abort(accumulate);
+ accumulate = log.abort(accumulate);
accumulate = markObsolete(obsoletions, accumulate);
// replace all updated readers with a version restored to its original state
@@ -502,7 +502,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
originals.remove(reader);
marked.remove(reader);
}
- return new LifecycleTransaction(tracker, transactionLog.getType(), readers);
+ return new LifecycleTransaction(tracker, log.getType(), readers);
}
/**
@@ -535,17 +535,17 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
public void trackNew(SSTable table)
{
- transactionLog.trackNew(table);
+ log.trackNew(table);
}
public void untrackNew(SSTable table)
{
- transactionLog.untrackNew(table);
+ log.untrackNew(table);
}
public static void removeUnfinishedLeftovers(CFMetaData metadata)
{
- TransactionLog.removeUnfinishedLeftovers(metadata);
+ LogTransaction.removeUnfinishedLeftovers(metadata);
}
/**
@@ -562,7 +562,25 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
*/
public static List<File> getFiles(Path folder, BiFunction<File, Directories.FileType, Boolean> filter, Directories.OnTxnErr onTxnErr)
{
- return new TransactionLog.FileLister(folder, filter, onTxnErr).list();
+ return new LogAwareFileLister(folder, filter, onTxnErr).list();
+ }
+
+ /**
+ * Retry all deletions that failed the first time around (presumably b/c the sstable was still mmap'd.)
+ * Useful because there are times when we know GC has been invoked; also exposed as an mbean.
+ */
+ public static void rescheduleFailedDeletions()
+ {
+ LogTransaction.rescheduleFailedDeletions();
+ }
+
+ /**
+ * Deletions run on the nonPeriodicTasks executor, (both failedDeletions or global tidiers in SSTableReader)
+ * so by scheduling a new empty task and waiting for it we ensure any prior deletion has completed.
+ */
+ public static void waitForDeletions()
+ {
+ LogTransaction.waitForDeletions();
}
// a class representing the current state of the reader within this transaction, encoding the actions both logged
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351c7cac/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java b/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java
new file mode 100644
index 0000000..e086078
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogAwareFileLister.java
@@ -0,0 +1,196 @@
+package org.apache.cassandra.db.lifecycle;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.*;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.db.Directories;
+
+import static org.apache.cassandra.db.Directories.*;
+
+/**
+ * A class for listing files in a folder.
+ */
+final class LogAwareFileLister
+{
+ // The folder to scan
+ private final Path folder;
+
+ // The filter determines which files the client wants returned
+ private final BiFunction<File, FileType, Boolean> filter; //file, file type
+
+ // The behavior when we fail to list files
+ private final OnTxnErr onTxnErr;
+
+ // The unfiltered result
+ NavigableMap<File, Directories.FileType> files = new TreeMap<>();
+
+ @VisibleForTesting
+ LogAwareFileLister(Path folder, BiFunction<File, FileType, Boolean> filter, OnTxnErr onTxnErr)
+ {
+ this.folder = folder;
+ this.filter = filter;
+ this.onTxnErr = onTxnErr;
+ }
+
+ public List<File> list()
+ {
+ try
+ {
+ return innerList();
+ }
+ catch (Throwable t)
+ {
+ throw new RuntimeException(String.format("Failed to list files in %s", folder), t);
+ }
+ }
+
+ List<File> innerList() throws Throwable
+ {
+ list(Files.newDirectoryStream(folder))
+ .stream()
+ .filter((f) -> !LogFile.isLogFile(f))
+ .forEach((f) -> files.put(f, FileType.FINAL));
+
+ // Since many file systems are not atomic, we cannot be sure we have listed a consistent disk state
+ // (Linux would permit this, but for simplicity we keep our behaviour the same across platforms)
+ // so we must be careful to list txn log files AFTER every other file since these files are deleted last,
+ // after all other files are removed
+ list(Files.newDirectoryStream(folder, '*' + LogFile.EXT))
+ .stream()
+ .filter(LogFile::isLogFile)
+ .forEach(this::classifyFiles);
+
+ // Finally we apply the user filter before returning our result
+ return files.entrySet().stream()
+ .filter((e) -> filter.apply(e.getKey(), e.getValue()))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+ }
+
+ static List<File> list(DirectoryStream<Path> stream) throws IOException
+ {
+ try
+ {
+ return StreamSupport.stream(stream.spliterator(), false)
+ .map(Path::toFile)
+ .filter((f) -> !f.isDirectory())
+ .collect(Collectors.toList());
+ }
+ finally
+ {
+ stream.close();
+ }
+ }
+
+ /**
+ * We read txn log files, if we fail we throw only if the user has specified
+ * OnTxnErr.THROW, else we log an error and apply the txn log anyway
+ */
+ void classifyFiles(File txnFile)
+ {
+ LogFile txn = LogFile.make(txnFile, -1);
+ readTxnLog(txn);
+ classifyFiles(txn);
+ files.put(txnFile, FileType.TXN_LOG);
+ }
+
+ void readTxnLog(LogFile txn)
+ {
+ txn.readRecords();
+ if (!txn.verify() && onTxnErr == OnTxnErr.THROW)
+ throw new LogTransaction.CorruptTransactionLogException("Some records failed verification. See earlier in log for details.", txn);
+ }
+
+ void classifyFiles(LogFile txnFile)
+ {
+ Map<LogRecord, Set<File>> oldFiles = txnFile.getFilesOfType(files.navigableKeySet(), LogRecord.Type.REMOVE);
+ Map<LogRecord, Set<File>> newFiles = txnFile.getFilesOfType(files.navigableKeySet(), LogRecord.Type.ADD);
+
+ if (txnFile.completed())
+ { // last record present, filter regardless of disk status
+ setTemporary(txnFile, oldFiles.values(), newFiles.values());
+ return;
+ }
+
+ if (allFilesPresent(txnFile, oldFiles, newFiles))
+ { // all files present, transaction is in progress, this will filter as aborted
+ setTemporary(txnFile, oldFiles.values(), newFiles.values());
+ return;
+ }
+
+ // some files are missing, we expect the txn file to either also be missing or completed, so check
+ // disk state again to resolve any previous races on non-atomic directory listing platforms
+
+ // if txn file also gone, then do nothing (all temporary should be gone, we could remove them if any)
+ if (!txnFile.exists())
+ return;
+
+ // otherwise read the file again to see if it is completed now
+ readTxnLog(txnFile);
+
+ if (txnFile.completed())
+ { // if after re-reading the txn is completed then filter accordingly
+ setTemporary(txnFile, oldFiles.values(), newFiles.values());
+ return;
+ }
+
+ // some files are missing and yet the txn is still there and not completed
+ // something must be wrong (see comment at the top of this file requiring txn to be
+ // completed before obsoleting or aborting sstables)
+ throw new RuntimeException(String.format("Failed to list directory files in %s, inconsistent disk state for transaction %s",
+ folder,
+ txnFile));
+ }
+
+ /** See if all files are present or if only the last record files are missing and it's a NEW record */
+ private static boolean allFilesPresent(LogFile txnFile, Map<LogRecord, Set<File>> oldFiles, Map<LogRecord, Set<File>> newFiles)
+ {
+ LogRecord lastRecord = txnFile.getLastRecord();
+ return !Stream.concat(oldFiles.entrySet().stream(),
+ newFiles.entrySet().stream()
+ .filter((e) -> e.getKey() != lastRecord))
+ .filter((e) -> e.getKey().numFiles > e.getValue().size())
+ .findFirst().isPresent();
+ }
+
+ private void setTemporary(LogFile txnFile, Collection<Set<File>> oldFiles, Collection<Set<File>> newFiles)
+ {
+ Collection<Set<File>> temporary = txnFile.committed() ? oldFiles : newFiles;
+ temporary.stream()
+ .flatMap(Set::stream)
+ .forEach((f) -> this.files.put(f, FileType.TEMPORARY));
+ }
+
+ @VisibleForTesting
+ static Set<File> getTemporaryFiles(File folder)
+ {
+ return listFiles(folder, FileType.TEMPORARY);
+ }
+
+ @VisibleForTesting
+ static Set<File> getFinalFiles(File folder)
+ {
+ return listFiles(folder, FileType.FINAL);
+ }
+
+ @VisibleForTesting
+ static Set<File> listFiles(File folder, FileType ... types)
+ {
+ Collection<FileType> match = Arrays.asList(types);
+ return new LogAwareFileLister(folder.toPath(),
+ (file, type) -> match.contains(type),
+ OnTxnErr.IGNORE).list()
+ .stream()
+ .collect(Collectors.toSet());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351c7cac/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
new file mode 100644
index 0000000..c698722
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
@@ -0,0 +1,364 @@
+package org.apache.cassandra.db.lifecycle;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LogRecord.Type;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.big.BigFormat;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.CLibrary;
+
+import static org.apache.cassandra.utils.Throwables.merge;
+
+/**
+ * The transaction log file, which contains many records.
+ */
+final class LogFile
+{
+ private static final Logger logger = LoggerFactory.getLogger(LogFile.class);
+
+ static String EXT = ".log";
+ static char SEP = '_';
+ // cc_txn_opname_id.log (where cc is one of the sstable versions defined in BigVersion)
+ static Pattern FILE_REGEX = Pattern.compile(String.format("^(.{2})_txn_(.*)_(.*)%s$", EXT));
+
+ final File file;
+ final Set<LogRecord> records = new LinkedHashSet<>();
+ final OperationType opType;
+ final UUID id;
+ final File folder;
+ final int folderDescriptor;
+
+ static LogFile make(File logFile, int folderDescriptor)
+ {
+ Matcher matcher = LogFile.FILE_REGEX.matcher(logFile.getName());
+ assert matcher.matches() && matcher.groupCount() == 3;
+
+ // For now we don't need this but it is there in case we need to change
+ // file format later on, the version is the sstable version as defined in BigFormat
+ //String version = matcher.group(1);
+
+ OperationType operationType = OperationType.fromFileName(matcher.group(2));
+ UUID id = UUID.fromString(matcher.group(3));
+
+ return new LogFile(operationType, logFile.getParentFile(), folderDescriptor, id);
+ }
+
+ void sync()
+ {
+ if (folderDescriptor > 0)
+ CLibrary.trySync(folderDescriptor);
+ }
+
+ OperationType getType()
+ {
+ return opType;
+ }
+
+ UUID getId()
+ {
+ return id;
+ }
+
+ Throwable removeUnfinishedLeftovers(Throwable accumulate)
+ {
+ try
+ {
+ deleteRecords(committed() ? Type.REMOVE : Type.ADD);
+
+ // we sync the parent file descriptor between contents and log deletion
+ // to ensure there is a happens before edge between them
+ sync();
+
+ Files.delete(file.toPath());
+ }
+ catch (Throwable t)
+ {
+ accumulate = merge(accumulate, t);
+ }
+
+ return accumulate;
+ }
+
+ static boolean isLogFile(File file)
+ {
+ return LogFile.FILE_REGEX.matcher(file.getName()).matches();
+ }
+
+ LogFile(OperationType opType, File folder, int folderDescriptor, UUID id)
+ {
+ this.opType = opType;
+ this.id = id;
+ this.folder = folder;
+ this.file = new File(getFileName(folder, opType, id));
+ this.folderDescriptor = folderDescriptor;
+ }
+
+ public void readRecords()
+ {
+ assert records.isEmpty();
+ FileUtils.readLines(file).stream()
+ .map(LogRecord::make)
+ .forEach(records::add);
+ }
+
+ public boolean verify()
+ {
+ Optional<LogRecord> firstInvalid = records.stream()
+ .filter(this::isInvalid)
+ .findFirst();
+
+ if (!firstInvalid.isPresent())
+ return true;
+
+ LogRecord failedOn = firstInvalid.get();
+ if (getLastRecord() != failedOn)
+ {
+ logError(failedOn);
+ return false;
+ }
+
+ if (records.stream()
+ .filter((r) -> r != failedOn)
+ .filter(LogFile::isInvalidWithCorruptedLastRecord)
+ .map(LogFile::logError)
+ .findFirst().isPresent())
+ {
+ logError(failedOn);
+ return false;
+ }
+
+ // if only the last record is corrupt and all other records have matching files on disk, @see verifyRecord,
+ // then we simply exited whilst serializing the last record and we carry on
+ logger.warn(String.format("Last record of transaction %s is corrupt or incomplete [%s], but all previous records match state on disk; continuing",
+ id,
+ failedOn.error));
+ return true;
+ }
+
+ static LogRecord logError(LogRecord record)
+ {
+ logger.error("{}", record.error);
+ return record;
+ }
+
+ boolean isInvalid(LogRecord record)
+ {
+ if (!record.isValid())
+ return true;
+
+ if (record.type == Type.UNKNOWN)
+ {
+ record.error(String.format("Could not parse record [%s]", record));
+ return true;
+ }
+
+ if (record.checksum != record.computeChecksum())
+ {
+ record.error(String.format("Invalid checksum for sstable [%s], record [%s]: [%d] should have been [%d]",
+ record.relativeFilePath,
+ record,
+ record.checksum,
+ record.computeChecksum()));
+ return true;
+ }
+
+ if (record.type != Type.REMOVE)
+ return false;
+
+ List<File> files = record.getExistingFiles(folder);
+
+ // Paranoid sanity checks: we create another record by looking at the files as they are
+ // on disk right now and make sure the information still matches
+ record.onDiskRecord = LogRecord.make(record.type, files, 0, record.relativeFilePath);
+
+ if (record.updateTime != record.onDiskRecord.updateTime && record.onDiskRecord.numFiles > 0)
+ {
+ record.error(String.format("Unexpected files detected for sstable [%s], record [%s]: last update time [%tT] should have been [%tT]",
+ record.relativeFilePath,
+ record,
+ record.onDiskRecord.updateTime,
+ record.updateTime));
+ return true;
+ }
+
+ return false;
+ }
+
+ static boolean isInvalidWithCorruptedLastRecord(LogRecord record)
+ {
+ if (record.type == Type.REMOVE && record.onDiskRecord.numFiles < record.numFiles)
+ { // if we found a corruption in the last record, then we continue only if the number of files matches exactly for all previous records.
+ record.error(String.format("Incomplete fileset detected for sstable [%s], record [%s]: number of files [%d] should have been [%d]. Treating as unrecoverable due to corruption of the final record.",
+ record.relativeFilePath,
+ record.raw,
+ record.onDiskRecord.numFiles,
+ record.numFiles));
+ return true;
+ }
+ return false;
+ }
+
+ public void commit()
+ {
+ assert !completed() : "Already completed!";
+ addRecord(LogRecord.makeCommit(System.currentTimeMillis()));
+ }
+
+ public void abort()
+ {
+ assert !completed() : "Already completed!";
+ addRecord(LogRecord.makeAbort(System.currentTimeMillis()));
+ }
+
+ private boolean isLastRecordValidWithType(Type type)
+ {
+ LogRecord lastRecord = getLastRecord();
+ return lastRecord != null &&
+ lastRecord.type == type &&
+ !isInvalid(lastRecord);
+ }
+
+ public boolean committed()
+ {
+ return isLastRecordValidWithType(Type.COMMIT);
+ }
+
+ public boolean aborted()
+ {
+ return isLastRecordValidWithType(Type.ABORT);
+ }
+
+ public boolean completed()
+ {
+ return committed() || aborted();
+ }
+
+ public void add(Type type, SSTable table)
+ {
+ if (!addRecord(makeRecord(type, table)))
+ throw new IllegalStateException();
+ }
+
+ private LogRecord makeRecord(Type type, SSTable table)
+ {
+ assert type == Type.ADD || type == Type.REMOVE;
+ return LogRecord.make(type, folder, table);
+ }
+
+ private boolean addRecord(LogRecord record)
+ {
+ if (!records.add(record))
+ return false;
+
+ // we only checksum the records, not the checksums themselves
+ FileUtils.append(file, record.toString());
+ sync();
+ return true;
+ }
+
+ public void remove(Type type, SSTable table)
+ {
+ LogRecord record = makeRecord(type, table);
+
+ assert records.contains(record) : String.format("[%s] is not tracked by %s", record, file);
+
+ records.remove(record);
+ deleteRecord(record);
+ }
+
+ public boolean contains(Type type, SSTable table)
+ {
+ return records.contains(makeRecord(type, table));
+ }
+
+ public void deleteRecords(Type type)
+ {
+ assert file.exists() : String.format("Expected %s to exists", file);
+ records.stream()
+ .filter(type::matches)
+ .forEach(this::deleteRecord);
+ records.clear();
+ }
+
+ private void deleteRecord(LogRecord record)
+ {
+ List<File> files = record.getExistingFiles(folder);
+
+ // we sort the files in ascending update time order so that the last update time
+ // stays the same even if we only partially delete files
+ files.sort((f1, f2) -> Long.compare(f1.lastModified(), f2.lastModified()));
+
+ files.forEach(LogTransaction::delete);
+ }
+
+ public Map<LogRecord, Set<File>> getFilesOfType(NavigableSet<File> files, Type type)
+ {
+ Map<LogRecord, Set<File>> ret = new HashMap<>();
+
+ records.stream()
+ .filter(type::matches)
+ .filter(LogRecord::isValid)
+ .forEach((r) -> ret.put(r, getRecordFiles(files, r)));
+
+ return ret;
+ }
+
+ public LogRecord getLastRecord()
+ {
+ return Iterables.getLast(records, null);
+ }
+
+ private Set<File> getRecordFiles(NavigableSet<File> files, LogRecord record)
+ {
+ Set<File> ret = new HashSet<>();
+ for (File file : files.tailSet(new File(folder, record.relativeFilePath)))
+ {
+ if (!file.getName().startsWith(record.relativeFilePath))
+ break;
+ ret.add(file);
+ }
+ return ret;
+ }
+
+ public void delete()
+ {
+ LogTransaction.delete(file);
+ }
+
+ public boolean exists()
+ {
+ return file.exists();
+ }
+
+ @Override
+ public String toString()
+ {
+ return FileUtils.getRelativePath(folder.getPath(), file.getPath());
+ }
+
+ static String getFileName(File folder, OperationType opType, UUID id)
+ {
+ String fileName = StringUtils.join(BigFormat.latestVersion,
+ LogFile.SEP,
+ "txn",
+ LogFile.SEP,
+ opType.fileName,
+ LogFile.SEP,
+ id.toString(),
+ LogFile.EXT);
+ return StringUtils.join(folder, File.separator, fileName);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351c7cac/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
new file mode 100644
index 0000000..0f0f3a2
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
@@ -0,0 +1,208 @@
+package org.apache.cassandra.db.lifecycle;
+
+import java.io.File;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.zip.CRC32;
+
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * A log file record, each record is encoded in one line and has different
+ * content depending on the record type.
+ */
+final class LogRecord
+{
+ public enum Type
+ {
+ UNKNOWN, // a record that cannot be parsed
+ ADD, // new files to be retained on commit
+ REMOVE, // old files to be retained on abort
+ COMMIT, // commit flag
+ ABORT; // abort flag
+
+ public static Type fromPrefix(String prefix)
+ {
+ return valueOf(prefix.toUpperCase());
+ }
+
+ public boolean hasFile()
+ {
+ return this == Type.ADD || this == Type.REMOVE;
+ }
+
+ public boolean matches(LogRecord record)
+ {
+ return this == record.type;
+ }
+ }
+
+
+ public final Type type;
+ public final String relativeFilePath;
+ public final long updateTime;
+ public final int numFiles;
+ public final String raw;
+ public final long checksum;
+
+ public String error;
+ public LogRecord onDiskRecord;
+
+ // (add|remove|commit|abort):[*,*,*][checksum]
+ static Pattern REGEX = Pattern.compile("^(add|remove|commit|abort):\\[([^,]*),?([^,]*),?([^,]*)\\]\\[(\\d*)\\]$", Pattern.CASE_INSENSITIVE);
+
+ public static LogRecord make(String line)
+ {
+ try
+ {
+ Matcher matcher = REGEX.matcher(line);
+ if (!matcher.matches())
+ return new LogRecord(Type.UNKNOWN, "", 0, 0, 0, line)
+ .error(String.format("Failed to parse [%s]", line));
+
+ Type type = Type.fromPrefix(matcher.group(1));
+ return new LogRecord(type, matcher.group(2), Long.valueOf(matcher.group(3)), Integer.valueOf(matcher.group(4)), Long.valueOf(matcher.group(5)), line);
+ }
+ catch (Throwable t)
+ {
+ return new LogRecord(Type.UNKNOWN, "", 0, 0, 0, line).error(t);
+ }
+ }
+
+ public static LogRecord makeCommit(long updateTime)
+ {
+ return new LogRecord(Type.COMMIT, "", updateTime, 0);
+ }
+
+ public static LogRecord makeAbort(long updateTime)
+ {
+ return new LogRecord(Type.ABORT, "", updateTime, 0);
+ }
+
+ public static LogRecord make(Type type, File parentFolder, SSTable table)
+ {
+ String relativePath = FileUtils.getRelativePath(parentFolder.getPath(), table.descriptor.baseFilename());
+ // why do we take the max of files.size() and table.getAllFilePaths().size()?
+ return make(type, getExistingFiles(parentFolder, relativePath), table.getAllFilePaths().size(), relativePath);
+ }
+
+ public static LogRecord make(Type type, List<File> files, int minFiles, String relativeFilePath)
+ {
+ long lastModified = files.stream().map(File::lastModified).reduce(0L, Long::max);
+ return new LogRecord(type, relativeFilePath, lastModified, Math.max(minFiles, files.size()));
+ }
+
+ private LogRecord(Type type,
+ String relativeFilePath,
+ long updateTime,
+ int numFiles)
+ {
+ this(type, relativeFilePath, updateTime, numFiles, 0, null);
+ }
+
+ private LogRecord(Type type,
+ String relativeFilePath,
+ long updateTime,
+ int numFiles,
+ long checksum,
+ String raw)
+ {
+ this.type = type;
+ this.relativeFilePath = type.hasFile() ? relativeFilePath : ""; // only meaningful for file records
+ this.updateTime = type == Type.REMOVE ? updateTime : 0; // only meaningful for old records
+ this.numFiles = type.hasFile() ? numFiles : 0; // only meaningful for file records
+ if (raw == null)
+ {
+ assert checksum == 0;
+ this.checksum = computeChecksum();
+ this.raw = format();
+ }
+ else
+ {
+ this.checksum = checksum;
+ this.raw = raw;
+ }
+
+ this.error = "";
+ }
+
+ public LogRecord error(Throwable t)
+ {
+ return error(t.getMessage());
+ }
+
+ public LogRecord error(String error)
+ {
+ this.error = error;
+ return this;
+ }
+
+ public boolean isValid()
+ {
+ return this.error.isEmpty();
+ }
+
+ private String format()
+ {
+ return String.format("%s:[%s,%d,%d][%d]", type.toString(), relativeFilePath, updateTime, numFiles, checksum);
+ }
+
+ public List<File> getExistingFiles(File folder)
+ {
+ if (!type.hasFile())
+ return Collections.emptyList();
+
+ return getExistingFiles(folder, relativeFilePath);
+ }
+
+ public static List<File> getExistingFiles(File parentFolder, String relativeFilePath)
+ {
+ return Arrays.asList(parentFolder.listFiles((dir, name) -> name.startsWith(relativeFilePath)));
+ }
+
+ @Override
+ public int hashCode()
+ {
+ // see comment in equals
+ return Objects.hash(type, relativeFilePath, error);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (!(obj instanceof LogRecord))
+ return false;
+
+ final LogRecord other = (LogRecord)obj;
+
+ // we exclude on purpose checksum, update time and count as
+ // we don't want duplicated records that differ only by
+ // properties that might change on disk, especially COMMIT records,
+ // there should be only one regardless of update time
+ // however we must compare the error to make sure we have more than
+ // one UNKNOWN record, if we fail to parse more than one
+ return type == other.type &&
+ relativeFilePath.equals(other.relativeFilePath) &&
+ error.equals(other.error);
+ }
+
+ @Override
+ public String toString()
+ {
+ return raw;
+ }
+
+ long computeChecksum()
+ {
+ CRC32 crc32 = new CRC32();
+ crc32.update(relativeFilePath.getBytes(FileUtils.CHARSET));
+ crc32.update(type.toString().getBytes(FileUtils.CHARSET));
+ FBUtilities.updateChecksumInt(crc32, (int) updateTime);
+ FBUtilities.updateChecksumInt(crc32, (int) (updateTime >>> 32));
+ FBUtilities.updateChecksumInt(crc32, numFiles);
+ return crc32.getValue() & (Long.MAX_VALUE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351c7cac/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
new file mode 100644
index 0000000..89d7beb
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.lifecycle;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Runnables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LogRecord.Type;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SnapshotDeletingTask;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.concurrent.RefCounted;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+/**
+ * IMPORTANT: When this object is involved in a transactional graph, and is not encapsulated in a LifecycleTransaction,
+ * for correct behaviour its commit MUST occur before any others, since it may legitimately fail. This is consistent
+ * with the Transactional API, which permits one failing action to occur at the beginning of the commit phase, but also
+ * *requires* that the prepareToCommit() phase only take actions that can be rolled back.
+ *
+ * IMPORTANT: The transaction must complete (commit or abort) before any temporary files are deleted, even though the
+ * txn log file itself will not be deleted until all tracked files are deleted. This is required by FileLister to ensure
+ * a consistent disk state. LifecycleTransaction ensures this requirement, so this class should really never be used
+ * outside of LT. @see FileLister.classifyFiles(TransactionData txn)
+ *
+ * A class that tracks sstable files involved in a transaction across sstables:
+ * if the transaction succeeds the old files should be deleted and the new ones kept; vice-versa if it fails.
+ *
+ * The transaction log file contains new and old sstables as follows:
+ *
+ * add:[sstable-2][CRC]
+ * remove:[sstable-1,max_update_time,num files][CRC]
+ *
+ * where sstable-2 is a new sstable to be retained if the transaction succeeds and sstable-1 is an old sstable to be
+ * removed. CRC is an incremental CRC of the file content up to this point. For old sstable files we also log the
+ * last update time of all files for the sstable descriptor and a checksum of vital properties such as update times
+ * and file sizes.
+ *
+ * Upon commit we add a final line to the log file:
+ *
+ * commit:[commit_time][CRC]
+ *
+ * When the transaction log is cleaned-up by the TransactionTidier, which happens only after any old sstables have been
+ * osoleted, then any sstable files for old sstables are removed before deleting the transaction log if the transaction
+ * was committed, vice-versa if the transaction was aborted.
+ *
+ * On start-up we look for any transaction log files and repeat the cleanup process described above.
+ *
+ * See CASSANDRA-7066 for full details.
+ */
+class LogTransaction extends Transactional.AbstractTransactional implements Transactional
+{
+ private static final Logger logger = LoggerFactory.getLogger(LogTransaction.class);
+
+ /**
+ * If the format of the lines in the transaction log is wrong or the checksum
+ * does not match, then we throw this exception.
+ */
+ public static final class CorruptTransactionLogException extends RuntimeException
+ {
+ public final LogFile file;
+
+ public CorruptTransactionLogException(String message, LogFile file)
+ {
+ super(message);
+ this.file = file;
+ }
+ }
+
+ private final Tracker tracker;
+ private final LogFile data;
+ private final Ref<LogTransaction> selfRef;
+ // Deleting sstables is tricky because the mmapping might not have been finalized yet,
+ // and delete will fail (on Windows) until it is (we only force the unmapping on SUN VMs).
+ // Additionally, we need to make sure to delete the data file first, so on restart the others
+ // will be recognized as GCable.
+ private static final Queue<Runnable> failedDeletions = new ConcurrentLinkedQueue<>();
+
+ LogTransaction(OperationType opType, CFMetaData metadata)
+ {
+ this(opType, metadata, null);
+ }
+
+ LogTransaction(OperationType opType, CFMetaData metadata, Tracker tracker)
+ {
+ this(opType, new Directories(metadata), tracker);
+ }
+
+ LogTransaction(OperationType opType, Directories directories, Tracker tracker)
+ {
+ this(opType, directories.getDirectoryForNewSSTables(), tracker);
+ }
+
+ LogTransaction(OperationType opType, File folder, Tracker tracker)
+ {
+ this.tracker = tracker;
+ int folderDescriptor = CLibrary.tryOpenDirectory(folder.getPath());
+ this.data = new LogFile(opType, folder, folderDescriptor, UUIDGen.getTimeUUID());
+ this.selfRef = new Ref<>(this, new TransactionTidier(data, folderDescriptor));
+
+ if (logger.isDebugEnabled())
+ logger.debug("Created transaction logs with id {}", data.id);
+ }
+
+ /**
+ * Track a reader as new.
+ **/
+ void trackNew(SSTable table)
+ {
+ data.add(Type.ADD, table);
+ }
+
+ /**
+ * Stop tracking a reader as new.
+ */
+ void untrackNew(SSTable table)
+ {
+ data.remove(Type.ADD, table);
+ }
+
+ /**
+ * Schedule a reader for deletion as soon as it is fully unreferenced.
+ */
+ SSTableTidier obsoleted(SSTableReader reader)
+ {
+ if (data.contains(Type.ADD, reader))
+ {
+ if (data.contains(Type.REMOVE, reader))
+ throw new IllegalArgumentException();
+
+ return new SSTableTidier(reader, true, this);
+ }
+
+ data.add(Type.REMOVE, reader);
+
+ if (tracker != null)
+ tracker.notifyDeleting(reader);
+
+ return new SSTableTidier(reader, false, this);
+ }
+
+ OperationType getType()
+ {
+ return data.getType();
+ }
+
+ UUID getId()
+ {
+ return data.getId();
+ }
+
+ @VisibleForTesting
+ String getDataFolder()
+ {
+ return data.folder.getPath();
+ }
+
+ @VisibleForTesting
+ LogFile getLogFile()
+ {
+ return data;
+ }
+
+ static void delete(File file)
+ {
+ try
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Deleting {}", file);
+
+ Files.delete(file.toPath());
+ }
+ catch (NoSuchFileException e)
+ {
+ logger.error("Unable to delete {} as it does not exist", file);
+ }
+ catch (IOException e)
+ {
+ logger.error("Unable to delete {}", file, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * The transaction tidier.
+ *
+ * When the transaction reference is fully released we try to delete all the obsolete files
+ * depending on the transaction result, as well as the transaction log file.
+ */
+ private static class TransactionTidier implements RefCounted.Tidy, Runnable
+ {
+ private final LogFile data;
+ private final int folderDescriptor;
+
+ TransactionTidier(LogFile data, int folderDescriptor)
+ {
+ this.data = data;
+ this.folderDescriptor = folderDescriptor;
+ }
+
+ public void tidy() throws Exception
+ {
+ run();
+ }
+
+ public String name()
+ {
+ return data.toString();
+ }
+
+ public void run()
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Removing files for transaction {}", name());
+
+ assert data.completed() : "Expected a completed transaction: " + data;
+
+ Throwable err = data.removeUnfinishedLeftovers(null);
+
+ if (err != null)
+ {
+ logger.info("Failed deleting files for transaction {}, we'll retry after GC and on on server restart", name(), err);
+ failedDeletions.add(this);
+ }
+ else
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Closing file transaction {}", name());
+ CLibrary.tryCloseFD(folderDescriptor);
+ }
+ }
+ }
+
+ static class Obsoletion
+ {
+ final SSTableReader reader;
+ final SSTableTidier tidier;
+
+ Obsoletion(SSTableReader reader, SSTableTidier tidier)
+ {
+ this.reader = reader;
+ this.tidier = tidier;
+ }
+ }
+
+ /**
+ * The SSTableReader tidier. When a reader is fully released and no longer referenced
+ * by any one, we run this. It keeps a reference to the parent transaction and releases
+ * it when done, so that the final transaction cleanup can run when all obsolete readers
+ * are released.
+ */
+ public static class SSTableTidier implements Runnable
+ {
+ // must not retain a reference to the SSTableReader, else leak detection cannot kick in
+ private final Descriptor desc;
+ private final long sizeOnDisk;
+ private final Tracker tracker;
+ private final boolean wasNew;
+ private final Ref<LogTransaction> parentRef;
+
+ public SSTableTidier(SSTableReader referent, boolean wasNew, LogTransaction parent)
+ {
+ this.desc = referent.descriptor;
+ this.sizeOnDisk = referent.bytesOnDisk();
+ this.tracker = parent.tracker;
+ this.wasNew = wasNew;
+ this.parentRef = parent.selfRef.tryRef();
+ }
+
+ public void run()
+ {
+ SystemKeyspace.clearSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
+
+ try
+ {
+ // If we can't successfully delete the DATA component, set the task to be retried later: see TransactionTidier
+ File datafile = new File(desc.filenameFor(Component.DATA));
+
+ delete(datafile);
+ // let the remainder be cleaned up by delete
+ SSTable.delete(desc, SSTable.discoverComponentsFor(desc));
+ }
+ catch (Throwable t)
+ {
+ logger.error("Failed deletion for {}, we'll retry after GC and on server restart", desc);
+ failedDeletions.add(this);
+ return;
+ }
+
+ if (tracker != null && tracker.cfstore != null && !wasNew)
+ tracker.cfstore.metric.totalDiskSpaceUsed.dec(sizeOnDisk);
+
+ // release the referent to the parent so that the all transaction files can be released
+ parentRef.release();
+ }
+
+ public void abort()
+ {
+ parentRef.release();
+ }
+ }
+
+
+ static void rescheduleFailedDeletions()
+ {
+ Runnable task;
+ while ( null != (task = failedDeletions.poll()))
+ ScheduledExecutors.nonPeriodicTasks.submit(task);
+
+ // On Windows, snapshots cannot be deleted so long as a segment of the root element is memory-mapped in NTFS.
+ SnapshotDeletingTask.rescheduleFailedTasks();
+ }
+
+ static void waitForDeletions()
+ {
+ FBUtilities.waitOnFuture(ScheduledExecutors.nonPeriodicTasks.schedule(Runnables.doNothing(), 0, TimeUnit.MILLISECONDS));
+ }
+
+ @VisibleForTesting
+ Throwable complete(Throwable accumulate)
+ {
+ try
+ {
+ accumulate = selfRef.ensureReleased(accumulate);
+ return accumulate;
+ }
+ catch (Throwable t)
+ {
+ logger.error("Failed to complete file transaction {}", getId(), t);
+ return Throwables.merge(accumulate, t);
+ }
+ }
+
+ protected Throwable doCommit(Throwable accumulate)
+ {
+ data.commit();
+ return complete(accumulate);
+ }
+
+ protected Throwable doAbort(Throwable accumulate)
+ {
+ data.abort();
+ return complete(accumulate);
+ }
+
+ protected void doPrepare() { }
+
+ /**
+ * Called on startup to scan existing folders for any unfinished leftovers of
+ * operations that were ongoing when the process exited. Also called by the standalone
+ * sstableutil tool when the cleanup option is specified, @see StandaloneSSTableUtil.
+ *
+ */
+ static void removeUnfinishedLeftovers(CFMetaData metadata)
+ {
+ for (File dir : new Directories(metadata).getCFDirectories())
+ {
+ int folderDescriptor = CLibrary.tryOpenDirectory(dir.getPath());
+ try
+ {
+ File[] logs = dir.listFiles(LogFile::isLogFile);
+
+ for (File log : logs)
+ {
+ LogFile data = LogFile.make(log, folderDescriptor);
+ data.readRecords();
+ if (data.verify())
+ {
+ Throwable failure = data.removeUnfinishedLeftovers(null);
+ logger.error("Failed to remove unfinished transaction leftovers for log {}", log, failure);
+ }
+ else
+ {
+ logger.error("Unexpected disk state: failed to read transaction log {}", log);
+ }
+ }
+ }
+ finally
+ {
+ CLibrary.tryCloseFD(folderDescriptor);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351c7cac/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index d028493..ffb71ee 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -224,7 +224,7 @@ public class Tracker
*/
public Throwable dropSSTables(final Predicate<SSTableReader> remove, OperationType operationType, Throwable accumulate)
{
- try (TransactionLog txnLogs = new TransactionLog(operationType, cfstore.metadata, this))
+ try (LogTransaction txnLogs = new LogTransaction(operationType, cfstore.metadata, this))
{
Pair<View, View> result = apply(view -> {
Set<SSTableReader> toremove = copyOf(filter(view.sstables, and(remove, notIn(view.compacting))));
@@ -236,7 +236,7 @@ public class Tracker
// It is important that any method accepting/returning a Throwable never throws an exception, and does its best
// to complete the instructions given to it
- List<TransactionLog.Obsoletion> obsoletions = new ArrayList<>();
+ List<LogTransaction.Obsoletion> obsoletions = new ArrayList<>();
accumulate = prepareForObsoletion(removed, txnLogs, obsoletions, accumulate);
try
{