You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/07/24 18:32:13 UTC
[4/5] cassandra git commit: Introduce safer durable sstable
membership management (and simplify cleanup of compaction leftovers)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index ba85eef..14cb795 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -28,7 +28,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -52,17 +51,17 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
private long currentBytesToWrite;
private int currentRatioIndex = 0;
- public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, OperationType compactionType)
+ public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
{
- this(cfs, txn, nonExpiredSSTables, compactionType, DEFAULT_SMALLEST_SSTABLE_BYTES);
+ this(cfs, txn, nonExpiredSSTables, DEFAULT_SMALLEST_SSTABLE_BYTES);
}
@SuppressWarnings("resource")
- public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, OperationType compactionType, long smallestSSTable)
+ public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long smallestSSTable)
{
super(cfs, txn, nonExpiredSSTables, false);
this.allSSTables = txn.originals();
- totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);
+ totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType());
double[] potentialRatios = new double[20];
double currentRatio = 1;
for (int i = 0; i < potentialRatios.length; i++)
@@ -86,13 +85,14 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
long currentPartitionsToWrite = Math.round(estimatedTotalKeys * ratios[currentRatioIndex]);
currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]);
@SuppressWarnings("resource")
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
- currentPartitionsToWrite,
- minRepairedAt,
- cfs.metadata,
- cfs.partitioner,
- new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
- SerializationHeader.make(cfs.metadata, nonExpiredSSTables));
+ SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
+ currentPartitionsToWrite,
+ minRepairedAt,
+ cfs.metadata,
+ cfs.partitioner,
+ new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
+ SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+ txn);
sstableWriter.switchWriter(writer);
logger.debug("Ratios={}, expectedKeys = {}, totalSize = {}, currentPartitionsToWrite = {}, currentBytesToWrite = {}", ratios, estimatedTotalKeys, totalSize, currentPartitionsToWrite, currentBytesToWrite);
@@ -109,13 +109,14 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
long currentPartitionsToWrite = Math.round(ratios[currentRatioIndex] * estimatedTotalKeys);
File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex])));
@SuppressWarnings("resource")
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
- currentPartitionsToWrite,
- minRepairedAt,
- cfs.metadata,
- cfs.partitioner,
- new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
- SerializationHeader.make(cfs.metadata, nonExpiredSSTables));
+ SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
+ currentPartitionsToWrite,
+ minRepairedAt,
+ cfs.metadata,
+ cfs.partitioner,
+ new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
+ SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+ txn);
sstableWriter.switchWriter(writer);
logger.debug("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
index 536e13c..2b94d7a 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
@@ -23,6 +23,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.Throwables;
import static com.google.common.base.Predicates.*;
import static com.google.common.collect.Iterables.any;
@@ -70,6 +71,16 @@ class Helpers
* A convenience method for encapsulating this action over multiple SSTableReader with exception-safety
* @return accumulate if not null (with any thrown exception attached), or any thrown exception otherwise
*/
+ static void setupKeyCache(Iterable<SSTableReader> readers)
+ {
+ for (SSTableReader reader : readers)
+ reader.setupKeyCache();
+ }
+
+ /**
+ * A convenience method for encapsulating this action over multiple SSTableReader with exception-safety
+ * @return accumulate if not null (with any thrown exception attached), or any thrown exception otherwise
+ */
static Throwable setReplaced(Iterable<SSTableReader> readers, Throwable accumulate)
{
for (SSTableReader reader : readers)
@@ -105,18 +116,51 @@ class Helpers
assert !reader.isReplaced();
}
- /**
- * A convenience method for encapsulating this action over multiple SSTableReader with exception-safety
- * @return accumulate if not null (with any thrown exception attached), or any thrown exception otherwise
- */
- static Throwable markObsolete(Tracker tracker, Iterable<SSTableReader> readers, Throwable accumulate)
+ static Throwable markObsolete(List<TransactionLogs.Obsoletion> obsoletions, Throwable accumulate)
+ {
+ if (obsoletions == null || obsoletions.isEmpty())
+ return accumulate;
+
+ for (TransactionLogs.Obsoletion obsoletion : obsoletions)
+ {
+ try
+ {
+ obsoletion.reader.markObsolete(obsoletion.tidier);
+ }
+ catch (Throwable t)
+ {
+ accumulate = merge(accumulate, t);
+ }
+ }
+ return accumulate;
+ }
+
+ static Throwable prepareForObsoletion(Iterable<SSTableReader> readers, TransactionLogs txnLogs, List<TransactionLogs.Obsoletion> obsoletions, Throwable accumulate)
{
for (SSTableReader reader : readers)
{
try
{
- boolean firstToCompact = reader.markObsolete(tracker);
- assert firstToCompact : reader + " was already marked compacted";
+ obsoletions.add(new TransactionLogs.Obsoletion(reader, txnLogs.obsoleted(reader)));
+ }
+ catch (Throwable t)
+ {
+ accumulate = Throwables.merge(accumulate, t);
+ }
+ }
+ return accumulate;
+ }
+
+ static Throwable abortObsoletion(List<TransactionLogs.Obsoletion> obsoletions, Throwable accumulate)
+ {
+ if (obsoletions == null || obsoletions.isEmpty())
+ return accumulate;
+
+ for (TransactionLogs.Obsoletion obsoletion : obsoletions)
+ {
+ try
+ {
+ obsoletion.tidier.abort();
}
catch (Throwable t)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index e14e2a1..b743633 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.db.lifecycle;
+import java.io.File;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
@@ -27,7 +28,9 @@ import com.google.common.collect.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableReader.UniqueIdentifier;
import org.apache.cassandra.utils.concurrent.Transactional;
@@ -44,12 +47,18 @@ import static org.apache.cassandra.utils.Throwables.maybeFail;
import static org.apache.cassandra.utils.concurrent.Refs.release;
import static org.apache.cassandra.utils.concurrent.Refs.selfRefs;
+/**
+ * IMPORTANT: When this object is involved in a transactional graph, for correct behaviour its commit MUST occur before
+ * any others, since it may legitimately fail. This is consistent with the Transactional API, which permits one failing
+ * action to occur at the beginning of the commit phase, but also *requires* that the prepareToCommit() phase only take
+ * actions that can be rolled back.
+ */
public class LifecycleTransaction extends Transactional.AbstractTransactional
{
private static final Logger logger = LoggerFactory.getLogger(LifecycleTransaction.class);
/**
- * a class that represents accumulated modifications to the Tracker.
+ * A class that represents accumulated modifications to the Tracker.
* has two instances, one containing modifications that are "staged" (i.e. invisible)
* and one containing those "logged" that have been made visible through a call to checkpoint()
*/
@@ -86,7 +95,8 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
}
public final Tracker tracker;
- private final OperationType operationType;
+ // The transaction logs keep track of new and old sstable files
+ private final TransactionLogs transactionLogs;
// the original readers this transaction was opened over, and that it guards
// (no other transactions may operate over these readers concurrently)
private final Set<SSTableReader> originals = new HashSet<>();
@@ -95,13 +105,16 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
// the identity set of readers we've ever encountered; used to ensure we don't accidentally revisit the
// same version of a reader. potentially a dangerous property if there are reference counting bugs
// as they won't be caught until the transaction's lifespan is over.
- private final Set<UniqueIdentifier> identities = Collections.newSetFromMap(new IdentityHashMap<UniqueIdentifier, Boolean>());
+ private final Set<UniqueIdentifier> identities = Collections.newSetFromMap(new IdentityHashMap<>());
// changes that have been made visible
private final State logged = new State();
// changes that are pending
private final State staged = new State();
+ // the tidier and their readers, to be used for marking readers obsoleted during a commit
+ private List<TransactionLogs.Obsoletion> obsoletions;
+
/**
* construct a Transaction for use in an offline operation
*/
@@ -122,10 +135,33 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
return new LifecycleTransaction(dummy, operationType, readers);
}
+ /**
+ * construct an empty Transaction with no existing readers
+ */
+ public static LifecycleTransaction offline(OperationType operationType, CFMetaData metadata)
+ {
+ Tracker dummy = new Tracker(null, false);
+ return new LifecycleTransaction(dummy, new TransactionLogs(operationType, metadata, dummy), Collections.emptyList());
+ }
+
+ /**
+ * construct an empty Transaction with no existing readers
+ */
+ public static LifecycleTransaction offline(OperationType operationType, File operationFolder)
+ {
+ Tracker dummy = new Tracker(null, false);
+ return new LifecycleTransaction(dummy, new TransactionLogs(operationType, operationFolder, dummy), Collections.emptyList());
+ }
+
LifecycleTransaction(Tracker tracker, OperationType operationType, Iterable<SSTableReader> readers)
{
+ this(tracker, new TransactionLogs(operationType, getMetadata(tracker, readers), tracker), readers);
+ }
+
+ LifecycleTransaction(Tracker tracker, TransactionLogs transactionLogs, Iterable<SSTableReader> readers)
+ {
this.tracker = tracker;
- this.operationType = operationType;
+ this.transactionLogs = transactionLogs;
for (SSTableReader reader : readers)
{
originals.add(reader);
@@ -134,6 +170,36 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
}
}
+ private static CFMetaData getMetadata(Tracker tracker, Iterable<SSTableReader> readers)
+ {
+ if (tracker.cfstore != null)
+ return tracker.cfstore.metadata;
+
+ for (SSTableReader reader : readers)
+ {
+ if (reader.metadata != null)
+ return reader.metadata;
+ }
+
+ assert false : "Expected cfstore or at least one reader with metadata";
+ return null;
+ }
+
+ public TransactionLogs logs()
+ {
+ return transactionLogs;
+ }
+
+ public OperationType opType()
+ {
+ return transactionLogs.getType();
+ }
+
+ public UUID opId()
+ {
+ return transactionLogs.getId();
+ }
+
public void doPrepare()
{
// note for future: in anticompaction two different operations use the same Transaction, and both prepareToCommit()
@@ -141,6 +207,11 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
// (and these happen anyway) this is fine but if more logic gets inserted here than is performed in a checkpoint,
// it may break this use case, and care is needed
checkpoint();
+
+ // prepare for compaction obsolete readers as long as they were part of the original set
+ // since those that are not original are early readers that share the same desc with the finals
+ maybeFail(prepareForObsoletion(filterIn(logged.obsolete, originals), transactionLogs, obsoletions = new ArrayList<>(), null));
+ transactionLogs.prepareToCommit();
}
/**
@@ -149,16 +220,23 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
public Throwable doCommit(Throwable accumulate)
{
assert staged.isEmpty() : "must be no actions introduced between prepareToCommit and a commit";
-
logger.debug("Committing update:{}, obsolete:{}", staged.update, staged.obsolete);
+ // accumulate must be null if we have been used correctly, so fail immediately if it is not
+ maybeFail(accumulate);
+
+ // transaction log commit failure means we must abort; safe commit is not possible
+ maybeFail(transactionLogs.commit(null));
+
// this is now the point of no return; we cannot safely rollback, so we ignore exceptions until we're done
// we restore state by obsoleting our obsolete files, releasing our references to them, and updating our size
// and notification status for the obsolete and new files
- accumulate = markObsolete(tracker, logged.obsolete, accumulate);
+
+ accumulate = markObsolete(obsoletions, accumulate);
accumulate = tracker.updateSizeTracking(logged.obsolete, logged.update, accumulate);
accumulate = release(selfRefs(logged.obsolete), accumulate);
- accumulate = tracker.notifySSTablesChanged(originals, logged.update, operationType, accumulate);
+ accumulate = tracker.notifySSTablesChanged(originals, logged.update, transactionLogs.getType(), accumulate);
+
return accumulate;
}
@@ -170,15 +248,20 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
if (logger.isDebugEnabled())
logger.debug("Aborting transaction over {}, with ({},{}) logged and ({},{}) staged", originals, logged.update, logged.obsolete, staged.update, staged.obsolete);
+ accumulate = abortObsoletion(obsoletions, accumulate);
+
if (logged.isEmpty() && staged.isEmpty())
- return accumulate;
+ return transactionLogs.abort(accumulate);
// mark obsolete all readers that are not versions of those present in the original set
Iterable<SSTableReader> obsolete = filterOut(concatUniq(staged.update, logged.update), originals);
logger.debug("Obsoleting {}", obsolete);
- // we don't pass the tracker in for the obsoletion, since these readers have never been notified externally
- // nor had their size accounting affected
- accumulate = markObsolete(null, obsolete, accumulate);
+
+ accumulate = prepareForObsoletion(obsolete, transactionLogs, obsoletions = new ArrayList<>(), accumulate);
+ // it's safe to abort even if committed, see maybeFail in doCommit() above, in this case it will just report
+ // a failure to abort, which is useful information to have for debug
+ accumulate = transactionLogs.abort(accumulate);
+ accumulate = markObsolete(obsoletions, accumulate);
// replace all updated readers with a version restored to its original state
accumulate = tracker.apply(updateLiveSet(logged.update, restoreUpdatedOriginals()), accumulate);
@@ -189,6 +272,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
// any _staged_ obsoletes should either be in staged.update already, and dealt with there,
// or is still in its original form (so left as is); in either case no extra action is needed
accumulate = release(selfRefs(concat(staged.update, logged.update, logged.obsolete)), accumulate);
+
logged.clear();
staged.clear();
return accumulate;
@@ -270,8 +354,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
}
/**
- * mark this reader as for obsoletion. this does not actually obsolete the reader until commit() is called,
- * but on checkpoint() the reader will be removed from the live set
+ * mark this reader as for obsoletion : on checkpoint() the reader will be removed from the live set
*/
public void obsolete(SSTableReader reader)
{
@@ -312,8 +395,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
*/
private Iterable<SSTableReader> fresh()
{
- return filterOut(staged.update,
- originals, logged.update);
+ return filterOut(staged.update, originals, logged.update);
}
/**
@@ -332,14 +414,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
private List<SSTableReader> restoreUpdatedOriginals()
{
Iterable<SSTableReader> torestore = filterIn(originals, logged.update, logged.obsolete);
- return ImmutableList.copyOf(transform(torestore,
- new Function<SSTableReader, SSTableReader>()
- {
- public SSTableReader apply(SSTableReader reader)
- {
- return current(reader).cloneWithNewStart(reader.first, null);
- }
- }));
+ return ImmutableList.copyOf(transform(torestore, (reader) -> current(reader).cloneWithNewStart(reader.first, null)));
}
/**
@@ -415,7 +490,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
originals.remove(reader);
marked.remove(reader);
}
- return new LifecycleTransaction(tracker, operationType, readers);
+ return new LifecycleTransaction(tracker, transactionLogs.getType(), readers);
}
/**
@@ -446,6 +521,31 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
return getFirst(originals, null);
}
+ public void trackNew(SSTable table)
+ {
+ transactionLogs.trackNew(table);
+ }
+
+ public void untrackNew(SSTable table)
+ {
+ transactionLogs.untrackNew(table);
+ }
+
+ public static void removeUnfinishedLeftovers(CFMetaData metadata)
+ {
+ TransactionLogs.removeUnfinishedLeftovers(metadata);
+ }
+
+ public static Set<File> getTemporaryFiles(CFMetaData metadata, File folder)
+ {
+ return TransactionLogs.getTemporaryFiles(metadata, folder);
+ }
+
+ public static Set<File> getLogFiles(CFMetaData metadata)
+ {
+ return TransactionLogs.getLogFiles(metadata);
+ }
+
// a class representing the current state of the reader within this transaction, encoding the actions both logged
// and pending, and the reader instances that are visible now, and will be after the next checkpoint (with null
// indicating either obsolescence, or that the reader does not occur in the transaction; which is defined
@@ -453,7 +553,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
@VisibleForTesting
public static class ReaderState
{
- public static enum Action
+ public enum Action
{
UPDATED, OBSOLETED, NONE;
public static Action get(boolean updated, boolean obsoleted)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index f1c4685..241eb4b 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.notifications.*;
import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.OpOrder;
import static com.google.common.base.Predicates.and;
@@ -162,9 +163,11 @@ public class Tracker
accumulate = merge(accumulate, t);
}
}
+
StorageMetrics.load.inc(add - subtract);
cfstore.metric.liveDiskSpaceUsed.inc(add - subtract);
- // we don't subtract from total until the sstable is deleted
+
+ // we don't subtract from total until the sstable is deleted, see TransactionLogs.SSTableTidier
cfstore.metric.totalDiskSpaceUsed.inc(add);
return accumulate;
}
@@ -224,29 +227,47 @@ public class Tracker
*/
public Throwable dropSSTables(final Predicate<SSTableReader> remove, OperationType operationType, Throwable accumulate)
{
- Pair<View, View> result = apply(new Function<View, View>()
+ try (TransactionLogs txnLogs = new TransactionLogs(operationType, cfstore.metadata, this))
{
- public View apply(View view)
- {
+ Pair<View, View> result = apply(view -> {
Set<SSTableReader> toremove = copyOf(filter(view.sstables, and(remove, notIn(view.compacting))));
return updateLiveSet(toremove, emptySet()).apply(view);
- }
- });
+ });
- Set<SSTableReader> removed = Sets.difference(result.left.sstables, result.right.sstables);
- assert Iterables.all(removed, remove);
+ Set<SSTableReader> removed = Sets.difference(result.left.sstables, result.right.sstables);
+ assert Iterables.all(removed, remove);
- if (!removed.isEmpty())
+ // It is important that any method accepting/returning a Throwable never throws an exception, and does its best
+ // to complete the instructions given to it
+ List<TransactionLogs.Obsoletion> obsoletions = new ArrayList<>();
+ accumulate = prepareForObsoletion(removed, txnLogs, obsoletions, accumulate);
+ try
+ {
+ txnLogs.finish();
+ if (!removed.isEmpty())
+ {
+ accumulate = markObsolete(obsoletions, accumulate);
+ accumulate = updateSizeTracking(removed, emptySet(), accumulate);
+ accumulate = release(selfRefs(removed), accumulate);
+ // notifySSTablesChanged -> LeveledManifest.promote doesn't like a no-op "promotion"
+ accumulate = notifySSTablesChanged(removed, Collections.<SSTableReader>emptySet(), txnLogs.getType(), accumulate);
+ }
+ }
+ catch (Throwable t)
+ {
+ accumulate = abortObsoletion(obsoletions, accumulate);
+ accumulate = Throwables.merge(accumulate, t);
+ }
+ }
+ catch (Throwable t)
{
- // notifySSTablesChanged -> LeveledManifest.promote doesn't like a no-op "promotion"
- accumulate = notifySSTablesChanged(removed, Collections.<SSTableReader>emptySet(), operationType, accumulate);
- accumulate = updateSizeTracking(removed, emptySet(), accumulate);
- accumulate = markObsolete(this, removed, accumulate);
- accumulate = release(selfRefs(removed), accumulate);
+ accumulate = Throwables.merge(accumulate, t);
}
+
return accumulate;
}
+
/**
* Removes every SSTable in the directory from the Tracker's view.
* @param directory the unreadable directory, possibly with SSTables in it, but not necessarily.
@@ -370,7 +391,6 @@ public class Tracker
sstable.createLinks(FileUtils.getCanonicalPath(backupsDir));
}
-
// NOTIFICATION
Throwable notifySSTablesChanged(Collection<SSTableReader> removed, Collection<SSTableReader> added, OperationType compactionType, Throwable accumulate)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/lifecycle/TransactionLogs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/TransactionLogs.java b/src/java/org/apache/cassandra/db/lifecycle/TransactionLogs.java
new file mode 100644
index 0000000..ab6c72a
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/lifecycle/TransactionLogs.java
@@ -0,0 +1,786 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.lifecycle;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.cassandra.utils.Throwables.merge;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.CLibrary;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.concurrent.Blocker;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.concurrent.RefCounted;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+/**
+ * IMPORTANT: When this object is involved in a transactional graph, and is not encapsulated in a LifecycleTransaction,
+ * for correct behaviour its commit MUST occur before any others, since it may legitimately fail. This is consistent
+ * with the Transactional API, which permits one failing action to occur at the beginning of the commit phase, but also
+ * *requires* that the prepareToCommit() phase only take actions that can be rolled back.
+ *
+ * A class that tracks sstable files involved in a transaction across sstables:
+ * if the transaction succeeds the old files should be deleted and the new ones kept; vice-versa if it fails.
+ *
+ * Two log files, NEW and OLD, contain new and old sstable files respectively. The log files also track each
+ * other by referencing each others path in the contents.
+ *
+ * If the transaction finishes successfully:
+ * - the OLD transaction file is deleted along with its contents, this includes the NEW transaction file.
+ * Before deleting we must let the SSTableTidier instances run first for any old readers that are being obsoleted
+ * (mark as compacted) by the transaction, see LifecycleTransaction
+ *
+ * If the transaction is aborted:
+ * - the NEW transaction file and its contents are deleted, this includes the OLD transaction file
+ *
+ * On start-up:
+ * - If we find a NEW transaction file, it means the transaction did not complete and we delete the NEW file and its contents
+ * - If we find an OLD transaction file but not a NEW file, it means the transaction must have completed and so we delete
+ * all the contents of the OLD file, if they still exist, and the OLD file itself.
+ *
+ * See CASSANDRA-7066 for full details.
+ */
+public class TransactionLogs extends Transactional.AbstractTransactional implements Transactional
+{
+ private static final Logger logger = LoggerFactory.getLogger(TransactionLogs.class);
+
+ /**
+ * A single transaction log file, either NEW or OLD.
+ */
+ final static class TransactionFile
+ {
+ static String EXT = ".log";
+ static char SEP = '_';
+ static String REGEX_STR = String.format("^(.*)_(.*)_(%s|%s)%s$", Type.NEW.txt, Type.OLD.txt, EXT);
+ static Pattern REGEX = Pattern.compile(REGEX_STR); //(opname)_(id)_(new|old).data
+
+ public enum Type
+ {
+ NEW (0, "new"),
+ OLD (1, "old");
+
+ public final int idx;
+ public final String txt;
+
+ Type(int idx, String txt)
+ {
+ this.idx = idx;
+ this.txt = txt;
+ }
+ };
+
+ public final Type type;
+ public final File file;
+ public final TransactionData parent;
+ public final Set<String> lines = new HashSet<>();
+
+ public TransactionFile(Type type, TransactionData parent)
+ {
+ this.type = type;
+ this.file = new File(parent.getFileName(type));
+ this.parent = parent;
+
+ if (exists())
+ lines.addAll(FileUtils.readLines(file));
+ }
+
+ public boolean add(SSTable table)
+ {
+ return add(table.descriptor.baseFilename());
+ }
+
+ private boolean add(String path)
+ {
+ String relativePath = FileUtils.getRelativePath(parent.getParentFolder(), path);
+ if (lines.contains(relativePath))
+ return false;
+
+ lines.add(relativePath);
+ FileUtils.append(file, relativePath);
+ return true;
+ }
+
+ public void remove(SSTable table)
+ {
+ String relativePath = FileUtils.getRelativePath(parent.getParentFolder(), table.descriptor.baseFilename());
+ assert lines.contains(relativePath) : String.format("%s is not tracked by %s", relativePath, file);
+
+ lines.remove(relativePath);
+ delete(relativePath);
+ }
+
+ public boolean contains(SSTable table)
+ {
+ String relativePath = FileUtils.getRelativePath(parent.getParentFolder(), table.descriptor.baseFilename());
+ return lines.contains(relativePath);
+ }
+
+ private void deleteContents()
+ {
+ deleteOpposite();
+
+ // we sync the parent file descriptor between opposite log deletion and
+ // contents deletion to ensure there is a happens before edge between them
+ parent.sync();
+
+ lines.forEach(line -> delete(line));
+ lines.clear();
+ }
+
+ private void deleteOpposite()
+ {
+ Type oppositeType = type == Type.NEW ? Type.OLD : Type.NEW;
+ String oppositeFile = FileUtils.getRelativePath(parent.getParentFolder(), parent.getFileName(oppositeType));
+ assert lines.contains(oppositeFile) : String.format("Could not find %s amongst lines", oppositeFile);
+
+ delete(oppositeFile);
+ lines.remove(oppositeFile);
+ }
+
+ private void delete(String relativePath)
+ {
+ getTrackedFiles(relativePath).forEach(file -> TransactionLogs.delete(file));
+ }
+
+ public Set<File> getTrackedFiles()
+ {
+ Set<File> ret = new HashSet<>();
+ FileUtils.readLines(file).forEach(line -> ret.addAll(getTrackedFiles(line)));
+ ret.add(file);
+ return ret;
+ }
+
+ private List<File> getTrackedFiles(String relativePath)
+ {
+ List<File> ret = new ArrayList<>();
+ File file = new File(StringUtils.join(parent.getParentFolder(), File.separator, relativePath));
+ if (file.exists())
+ ret.add(file);
+ else
+ ret.addAll(Arrays.asList(new File(parent.getParentFolder()).listFiles((dir, name) -> {
+ return name.startsWith(relativePath);
+ })));
+
+ return ret;
+ }
+
+ public void delete(boolean deleteContents)
+ {
+ assert file.exists() : String.format("Expected %s to exists", file);
+
+ if (deleteContents)
+ deleteContents();
+
+ // we sync the parent file descriptor between contents and log deletion
+ // to ensure there is a happens before edge between them
+ parent.sync();
+
+ TransactionLogs.delete(file);
+ }
+
+ public boolean exists()
+ {
+ return file.exists();
+ }
+ }
+
+ /**
+ * We split the transaction data from the behavior because we need
+ * to reconstruct any left-overs and clean them up, as well as work
+ * out which files are temporary. So for these cases we don't want the full
+ * transactional behavior, plus it's handy for the TransactionTidier.
+ */
+ final static class TransactionData implements AutoCloseable
+ {
+ private final OperationType opType;
+ private final UUID id;
+ private final File folder;
+ private final TransactionFile[] files;
+ private int folderDescriptor;
+ private boolean succeeded;
+
+ static TransactionData make(File logFile)
+ {
+ Matcher matcher = TransactionFile.REGEX.matcher(logFile.getName());
+ assert matcher.matches();
+
+ OperationType operationType = OperationType.fromFileName(matcher.group(1));
+ UUID id = UUID.fromString(matcher.group(2));
+
+ return new TransactionData(operationType, logFile.getParentFile(), id);
+ }
+
+ TransactionData(OperationType opType, File folder, UUID id)
+ {
+ this.opType = opType;
+ this.id = id;
+ this.folder = folder;
+ this.files = new TransactionFile[TransactionFile.Type.values().length];
+ for (TransactionFile.Type t : TransactionFile.Type.values())
+ this.files[t.idx] = new TransactionFile(t, this);
+
+ this.folderDescriptor = CLibrary.tryOpenDirectory(folder.getPath());
+ this.succeeded = !newLog().exists() && oldLog().exists();
+ }
+
+ public void succeeded(boolean succeeded)
+ {
+ this.succeeded = succeeded;
+ }
+
+ public void close()
+ {
+ if (folderDescriptor > 0)
+ {
+ CLibrary.tryCloseFD(folderDescriptor);
+ folderDescriptor = -1;
+ }
+ }
+
+ void crossReference()
+ {
+ newLog().add(oldLog().file.getPath());
+ oldLog().add(newLog().file.getPath());
+ }
+
+ void sync()
+ {
+ if (folderDescriptor > 0)
+ CLibrary.trySync(folderDescriptor);
+ }
+
+ TransactionFile newLog()
+ {
+ return files[TransactionFile.Type.NEW.idx];
+ }
+
+ TransactionFile oldLog()
+ {
+ return files[TransactionFile.Type.OLD.idx];
+ }
+
+ OperationType getType()
+ {
+ return opType;
+ }
+
+ UUID getId()
+ {
+ return id;
+ }
+
+ Throwable removeUnfinishedLeftovers(Throwable accumulate)
+ {
+ try
+ {
+ if (succeeded)
+ oldLog().delete(true);
+ else
+ newLog().delete(true);
+ }
+ catch (Throwable t)
+ {
+ accumulate = merge(accumulate, t);
+ }
+
+ return accumulate;
+ }
+
+ Set<File> getTemporaryFiles()
+ {
+ sync();
+
+ if (newLog().exists())
+ return newLog().getTrackedFiles();
+ else
+ return oldLog().getTrackedFiles();
+ }
+
+ String getFileName(TransactionFile.Type type)
+ {
+ String fileName = StringUtils.join(opType.fileName,
+ TransactionFile.SEP,
+ id.toString(),
+ TransactionFile.SEP,
+ type.txt,
+ TransactionFile.EXT);
+ return StringUtils.join(folder, File.separator, fileName);
+ }
+
+ String getParentFolder()
+ {
+ return folder.getParent();
+ }
+
+ static boolean isLogFile(String name)
+ {
+ return TransactionFile.REGEX.matcher(name).matches();
+ }
+ }
+
+ private final Tracker tracker;
+ private final TransactionData data;
+ private final Ref<TransactionLogs> selfRef;
+ // Deleting sstables is tricky because the mmapping might not have been finalized yet,
+ // and delete will fail (on Windows) until it is (we only force the unmapping on SUN VMs).
+ // Additionally, we need to make sure to delete the data file first, so on restart the others
+ // will be recognized as GCable.
+ private static final Queue<Runnable> failedDeletions = new ConcurrentLinkedQueue<>();
+ private static final Blocker blocker = new Blocker();
+
+ TransactionLogs(OperationType opType, CFMetaData metadata)
+ {
+ this(opType, metadata, null);
+ }
+
+ TransactionLogs(OperationType opType, CFMetaData metadata, Tracker tracker)
+ {
+ this(opType, new Directories(metadata), tracker);
+ }
+
+ TransactionLogs(OperationType opType, Directories directories, Tracker tracker)
+ {
+ this(opType, directories.getDirectoryForNewSSTables(), tracker);
+ }
+
+ TransactionLogs(OperationType opType, File folder, Tracker tracker)
+ {
+ this.tracker = tracker;
+ this.data = new TransactionData(opType,
+ Directories.getTransactionsDirectory(folder),
+ UUIDGen.getTimeUUID());
+ this.selfRef = new Ref<>(this, new TransactionTidier(data));
+
+ data.crossReference();
+ if (logger.isDebugEnabled())
+ logger.debug("Created transaction logs with id {}", data.id);
+ }
+
+ /**
+ * Track a reader as new.
+ **/
+ void trackNew(SSTable table)
+ {
+ if (!data.newLog().add(table))
+ throw new IllegalStateException(table + " is already tracked as new");
+
+ data.newLog().add(table);
+ }
+
+ /**
+ * Stop tracking a reader as new.
+ */
+ void untrackNew(SSTable table)
+ {
+ data.newLog().remove(table);
+ }
+
+ /**
+ * Schedule a reader for deletion as soon as it is fully unreferenced and the transaction
+ * has been committed.
+ */
+ SSTableTidier obsoleted(SSTableReader reader)
+ {
+ if (data.newLog().contains(reader))
+ {
+ if (data.oldLog().contains(reader))
+ throw new IllegalArgumentException();
+
+ return new SSTableTidier(reader, true, this);
+ }
+
+ if (!data.oldLog().add(reader))
+ throw new IllegalStateException();
+
+ if (tracker != null)
+ tracker.notifyDeleting(reader);
+
+ return new SSTableTidier(reader, false, this);
+ }
+
+ OperationType getType()
+ {
+ return data.getType();
+ }
+
+ UUID getId()
+ {
+ return data.getId();
+ }
+
+ @VisibleForTesting
+ String getDataFolder()
+ {
+ return data.getParentFolder();
+ }
+
+ @VisibleForTesting
+ String getLogsFolder()
+ {
+ return StringUtils.join(getDataFolder(), File.separator, Directories.TRANSACTIONS_SUBDIR);
+ }
+
+ @VisibleForTesting
+ TransactionData getData()
+ {
+ return data;
+ }
+
+ private static void delete(File file)
+ {
+ try
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Deleting {}", file);
+
+ Files.delete(file.toPath());
+ }
+ catch (NoSuchFileException e)
+ {
+ logger.warn("Unable to delete {} as it does not exist", file);
+ }
+ catch (IOException e)
+ {
+ logger.error("Unable to delete {}", file, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * The transaction tidier.
+ *
+ * When the transaction reference is fully released we try to delete all the obsolete files
+ * depending on the transaction result.
+ */
+ private static class TransactionTidier implements RefCounted.Tidy, Runnable
+ {
+ private final TransactionData data;
+
+ public TransactionTidier(TransactionData data)
+ {
+ this.data = data;
+ }
+
+ public void tidy() throws Exception
+ {
+ run();
+ }
+
+ public String name()
+ {
+ return data.id.toString();
+ }
+
+ public void run()
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Removing files for transaction {}", name());
+
+ Throwable err = data.removeUnfinishedLeftovers(null);
+
+ if (err != null)
+ {
+ logger.info("Failed deleting files for transaction {}, we'll retry after GC and on on server restart", name(), err);
+ failedDeletions.add(this);
+ }
+ else
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Closing file transaction {}", name());
+ data.close();
+ }
+ }
+ }
+
+ static class Obsoletion
+ {
+ final SSTableReader reader;
+ final SSTableTidier tidier;
+
+ public Obsoletion(SSTableReader reader, SSTableTidier tidier)
+ {
+ this.reader = reader;
+ this.tidier = tidier;
+ }
+ }
+
+ /**
+ * The SSTableReader tidier. When a reader is fully released and no longer referenced
+ * by any one, we run this. It keeps a reference to the parent transaction and releases
+ * it when done, so that the final transaction cleanup can run when all obsolete readers
+ * are released.
+ */
+ public static class SSTableTidier implements Runnable
+ {
+ // must not retain a reference to the SSTableReader, else leak detection cannot kick in
+ private final Descriptor desc;
+ private final long sizeOnDisk;
+ private final Tracker tracker;
+ private final boolean wasNew;
+ private final Ref<TransactionLogs> parentRef;
+
+ public SSTableTidier(SSTableReader referent, boolean wasNew, TransactionLogs parent)
+ {
+ this.desc = referent.descriptor;
+ this.sizeOnDisk = referent.bytesOnDisk();
+ this.tracker = parent.tracker;
+ this.wasNew = wasNew;
+ this.parentRef = parent.selfRef.tryRef();
+ }
+
+ public void run()
+ {
+ blocker.ask();
+
+ SystemKeyspace.clearSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
+
+ try
+ {
+ // If we can't successfully delete the DATA component, set the task to be retried later: see TransactionTidier
+ File datafile = new File(desc.filenameFor(Component.DATA));
+
+ delete(datafile);
+ // let the remainder be cleaned up by delete
+ SSTable.delete(desc, SSTable.discoverComponentsFor(desc));
+ }
+ catch (Throwable t)
+ {
+ logger.error("Failed deletion for {}, we'll retry after GC and on server restart", desc);
+ failedDeletions.add(this);
+ return;
+ }
+
+ if (tracker != null && !wasNew)
+ tracker.cfstore.metric.totalDiskSpaceUsed.dec(sizeOnDisk);
+
+ // release the referent to the parent so that the all transaction files can be released
+ parentRef.release();
+ }
+
+ public void abort()
+ {
+ parentRef.release();
+ }
+ }
+
+ /**
+ * Retry all deletions that failed the first time around (presumably b/c the sstable was still mmap'd.)
+ * Useful because there are times when we know GC has been invoked; also exposed as an mbean.
+ */
+ public static void rescheduleFailedDeletions()
+ {
+ Runnable task;
+ while ( null != (task = failedDeletions.poll()))
+ ScheduledExecutors.nonPeriodicTasks.submit(task);
+ }
+
+ /**
+ * Deletions run on the nonPeriodicTasks executor, (both failedDeletions or global tidiers in SSTableReader)
+ * so by scheduling a new empty task and waiting for it we ensure any prior deletion has completed.
+ */
+ public static void waitForDeletions()
+ {
+ FBUtilities.waitOnFuture(ScheduledExecutors.nonPeriodicTasks.schedule(() -> {
+ }, 0, TimeUnit.MILLISECONDS));
+ }
+
+ @VisibleForTesting
+ public static void pauseDeletions(boolean stop)
+ {
+ blocker.block(stop);
+ }
+
+ private Throwable complete(Throwable accumulate)
+ {
+ try
+ {
+ try
+ {
+ if (data.succeeded)
+ data.newLog().delete(false);
+ else
+ data.oldLog().delete(false);
+ }
+ catch (Throwable t)
+ {
+ accumulate = merge(accumulate, t);
+ }
+
+ accumulate = selfRef.ensureReleased(accumulate);
+ return accumulate;
+ }
+ catch (Throwable t)
+ {
+ logger.error("Failed to complete file transaction {}", getId(), t);
+ return Throwables.merge(accumulate, t);
+ }
+ }
+
+ protected Throwable doCommit(Throwable accumulate)
+ {
+ data.succeeded(true);
+ return complete(accumulate);
+ }
+
+ protected Throwable doAbort(Throwable accumulate)
+ {
+ data.succeeded(false);
+ return complete(accumulate);
+ }
+
+ protected void doPrepare() { }
+
+ /**
+ * Called on startup to scan existing folders for any unfinished leftovers of
+ * operations that were ongoing when the process exited.
+ *
+ * We check if the new transaction file exists first, and if so we clean it up
+ * along with its contents, which includes the old file, else if only the old file exists
+ * it means the operation has completed and we only cleanup the old file with its contents.
+ */
+ static void removeUnfinishedLeftovers(CFMetaData metadata)
+ {
+ Throwable accumulate = null;
+ Set<UUID> ids = new HashSet<>();
+
+ for (File dir : getFolders(metadata, null))
+ {
+ File[] logs = dir.listFiles((dir1, name) -> {
+ return TransactionData.isLogFile(name);
+ });
+
+ for (File log : logs)
+ {
+ try (TransactionData data = TransactionData.make(log))
+ {
+ // we need to check this because there are potentially 2 log files per operation
+ if (ids.contains(data.id))
+ continue;
+
+ ids.add(data.id);
+ accumulate = data.removeUnfinishedLeftovers(accumulate);
+ }
+ }
+ }
+
+ if (accumulate != null)
+ logger.error("Failed to remove unfinished transaction leftovers", accumulate);
+ }
+
+ /**
+ * Return a set of files that are temporary, that is they are involved with
+ * a transaction that hasn't completed yet.
+ *
+ * Only return the files that exist and that are located in the folder
+ * specified as a parameter or its sub-folders.
+ */
+ static Set<File> getTemporaryFiles(CFMetaData metadata, File folder)
+ {
+ Set<File> ret = new HashSet<>();
+ Set<UUID> ids = new HashSet<>();
+
+ for (File dir : getFolders(metadata, folder))
+ {
+ File[] logs = dir.listFiles((dir1, name) -> {
+ return TransactionData.isLogFile(name);
+ });
+
+ for (File log : logs)
+ {
+ try(TransactionData data = TransactionData.make(log))
+ {
+ // we need to check this because there are potentially 2 log files per transaction
+ if (ids.contains(data.id))
+ continue;
+
+ ids.add(data.id);
+ ret.addAll(data.getTemporaryFiles()
+ .stream()
+ .filter(file -> FileUtils.isContained(folder, file))
+ .collect(Collectors.toSet()));
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ /**
+ * Return the transaction log files that currently exist for this table.
+ */
+ static Set<File> getLogFiles(CFMetaData metadata)
+ {
+ Set<File> ret = new HashSet<>();
+ for (File dir : getFolders(metadata, null))
+ ret.addAll(Arrays.asList(dir.listFiles((dir1, name) -> {
+ return TransactionData.isLogFile(name);
+ })));
+
+ return ret;
+ }
+
+ /**
+ * A utility method to work out the existing transaction sub-folders
+ * either for a table, or a specific parent folder, or both.
+ */
+ private static List<File> getFolders(CFMetaData metadata, File folder)
+ {
+ List<File> ret = new ArrayList<>();
+ if (metadata != null)
+ {
+ Directories directories = new Directories(metadata);
+ ret.addAll(directories.getExistingDirectories(Directories.TRANSACTIONS_SUBDIR));
+ }
+
+ if (folder != null)
+ {
+ File opDir = Directories.getExistingDirectory(folder, Directories.TRANSACTIONS_SUBDIR);
+ if (opDir != null)
+ ret.add(opDir);
+ }
+
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 174e634..acc9141 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.Pair;
@@ -60,18 +59,19 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
this.formatType = type;
}
- protected SSTableWriter createWriter()
+ protected SSTableTxnWriter createWriter()
{
- return SSTableWriter.create(createDescriptor(directory, metadata.ksName, metadata.cfName, formatType),
+ return SSTableTxnWriter.create(createDescriptor(directory, metadata.ksName, metadata.cfName, formatType),
0,
ActiveRepairService.UNREPAIRED_SSTABLE,
+ 0,
new SerializationHeader(metadata, columns, EncodingStats.NO_STATS));
}
private static Descriptor createDescriptor(File directory, final String keyspace, final String columnFamily, final SSTableFormat.Type fmt)
{
int maxGen = getNextGeneration(directory, columnFamily);
- return new Descriptor(directory, keyspace, columnFamily, maxGen + 1, Descriptor.Type.TEMP, fmt);
+ return new Descriptor(directory, keyspace, columnFamily, maxGen + 1, fmt);
}
private static int getNextGeneration(File directory, final String columnFamily)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index 5ab99e7..519f14e 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -18,9 +18,7 @@
package org.apache.cassandra.io.sstable;
import java.io.File;
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.StringTokenizer;
+import java.util.*;
import com.google.common.base.CharMatcher;
import com.google.common.base.Objects;
@@ -46,47 +44,35 @@ import static org.apache.cassandra.io.sstable.Component.separator;
*/
public class Descriptor
{
- public static enum Type
- {
- TEMP("tmp", true), TEMPLINK("tmplink", true), FINAL(null, false);
- public final boolean isTemporary;
- public final String marker;
- Type(String marker, boolean isTemporary)
- {
- this.isTemporary = isTemporary;
- this.marker = marker;
- }
- }
-
+ public static String TMP_EXT = ".tmp";
public final File directory;
/** version has the following format: <code>[a-z]+</code> */
public final Version version;
public final String ksname;
public final String cfname;
public final int generation;
- public final Type type;
public final SSTableFormat.Type formatType;
private final int hashCode;
/**
* A descriptor that assumes CURRENT_VERSION.
*/
- public Descriptor(File directory, String ksname, String cfname, int generation, Type temp)
+ public Descriptor(File directory, String ksname, String cfname, int generation)
{
- this(DatabaseDescriptor.getSSTableFormat().info.getLatestVersion(), directory, ksname, cfname, generation, temp, DatabaseDescriptor.getSSTableFormat());
+ this(DatabaseDescriptor.getSSTableFormat().info.getLatestVersion(), directory, ksname, cfname, generation, DatabaseDescriptor.getSSTableFormat());
}
- public Descriptor(File directory, String ksname, String cfname, int generation, Type temp, SSTableFormat.Type formatType)
+ public Descriptor(File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType)
{
- this(formatType.info.getLatestVersion(), directory, ksname, cfname, generation, temp, formatType);
+ this(formatType.info.getLatestVersion(), directory, ksname, cfname, generation, formatType);
}
- public Descriptor(String version, File directory, String ksname, String cfname, int generation, Type temp, SSTableFormat.Type formatType)
+ public Descriptor(String version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType)
{
- this(formatType.info.getVersion(version), directory, ksname, cfname, generation, temp, formatType);
+ this(formatType.info.getVersion(version), directory, ksname, cfname, generation, formatType);
}
- public Descriptor(Version version, File directory, String ksname, String cfname, int generation, Type temp, SSTableFormat.Type formatType)
+ public Descriptor(Version version, File directory, String ksname, String cfname, int generation, SSTableFormat.Type formatType)
{
assert version != null && directory != null && ksname != null && cfname != null && formatType.info.getLatestVersion().getClass().equals(version.getClass());
this.version = version;
@@ -94,20 +80,24 @@ public class Descriptor
this.ksname = ksname;
this.cfname = cfname;
this.generation = generation;
- this.type = temp;
this.formatType = formatType;
- hashCode = Objects.hashCode(version, directory, generation, ksname, cfname, temp, formatType);
+ hashCode = Objects.hashCode(version, directory, generation, ksname, cfname, formatType);
}
public Descriptor withGeneration(int newGeneration)
{
- return new Descriptor(version, directory, ksname, cfname, newGeneration, type, formatType);
+ return new Descriptor(version, directory, ksname, cfname, newGeneration, formatType);
}
public Descriptor withFormatType(SSTableFormat.Type newType)
{
- return new Descriptor(newType.info.getLatestVersion(), directory, ksname, cfname, generation, type, newType);
+ return new Descriptor(newType.info.getLatestVersion(), directory, ksname, cfname, generation, newType);
+ }
+
+ public String tmpFilenameFor(Component component)
+ {
+ return filenameFor(component) + TMP_EXT;
}
public String filenameFor(Component component)
@@ -130,8 +120,6 @@ public class Descriptor
buff.append(ksname).append(separator);
buff.append(cfname).append(separator);
}
- if (type.isTemporary)
- buff.append(type.marker).append(separator);
buff.append(version).append(separator);
buff.append(generation);
if (formatType != SSTableFormat.Type.LEGACY)
@@ -160,6 +148,37 @@ public class Descriptor
return baseFilename() + separator + suffix;
}
+
+ /** Return any temporary files found in the directory */
+ public List<File> getTemporaryFiles()
+ {
+ List<File> ret = new ArrayList<>();
+ File[] tmpFiles = directory.listFiles((dir, name) ->
+ name.endsWith(Descriptor.TMP_EXT));
+
+ for (File tmpFile : tmpFiles)
+ ret.add(tmpFile);
+
+ return ret;
+ }
+
+ /**
+ * Files obsoleted by CASSANDRA-7066 :
+ * - temporary files used to start with either tmp or tmplink
+ * - system.compactions_in_progress sstable files
+ */
+ public static boolean isLegacyFile(String fileName)
+ {
+ return fileName.startsWith("compactions_in_progress") ||
+ fileName.startsWith("tmp") ||
+ fileName.startsWith("tmplink");
+ }
+
+ public static boolean isValidFile(String fileName)
+ {
+ return fileName.endsWith(".db") && !isLegacyFile(fileName);
+ }
+
/**
* @see #fromFilename(File directory, String name)
* @param filename The SSTable filename
@@ -222,7 +241,7 @@ public class Descriptor
String component = skipComponent ? null : tokenStack.pop();
nexttok = tokenStack.pop();
- // generation OR Type
+ // generation OR format type
SSTableFormat.Type fmt = SSTableFormat.Type.LEGACY;
if (!CharMatcher.DIGIT.matchesAllOf(nexttok))
{
@@ -240,20 +259,6 @@ public class Descriptor
if (!version.validate(nexttok))
throw new UnsupportedOperationException("SSTable " + name + " is too old to open. Upgrade to 2.0 first, and run upgradesstables");
- // optional temporary marker
- Type type = Descriptor.Type.FINAL;
- nexttok = tokenStack.peek();
- if (Descriptor.Type.TEMP.marker.equals(nexttok))
- {
- type = Descriptor.Type.TEMP;
- tokenStack.pop();
- }
- else if (Descriptor.Type.TEMPLINK.marker.equals(nexttok))
- {
- type = Descriptor.Type.TEMPLINK;
- tokenStack.pop();
- }
-
// ks/cf names
String ksname, cfname;
if (version.hasNewFileName())
@@ -285,16 +290,7 @@ public class Descriptor
}
assert tokenStack.isEmpty() : "Invalid file name " + name + " in " + directory;
- return Pair.create(new Descriptor(version, parentDirectory, ksname, cfname, generation, type, fmt), component);
- }
-
- /**
- * @param type temporary flag
- * @return A clone of this descriptor with the given 'temporary' status.
- */
- public Descriptor asType(Type type)
- {
- return new Descriptor(version, directory, ksname, cfname, generation, type, formatType);
+ return Pair.create(new Descriptor(version, parentDirectory, ksname, cfname, generation, fmt), component);
}
public IMetadataSerializer getMetadataSerializer()
@@ -331,8 +327,7 @@ public class Descriptor
&& that.generation == this.generation
&& that.ksname.equals(this.ksname)
&& that.cfname.equals(this.cfname)
- && that.formatType == this.formatType
- && that.type == this.type;
+ && that.formatType == this.formatType;
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 6f66fd3..e6558eb 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -259,11 +259,12 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
{
for (SSTableReader sstable : ImmutableList.copyOf(txn.originals()))
{
- // We can't change the sampling level of sstables with the old format, because the serialization format
- // doesn't include the sampling level. Leave this one as it is. (See CASSANDRA-8993 for details.)
- logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
if (!sstable.descriptor.version.hasSamplingLevel())
{
+ // We can't change the sampling level of sstables with the old format, because the serialization format
+ // doesn't include the sampling level. Leave this one as it is. (See CASSANDRA-8993 for details.)
+ logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
+
oldFormatSSTables.add(sstable);
txn.cancel(sstable);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 2077152..516534d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -22,6 +22,7 @@ import java.nio.charset.Charset;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicates;
import com.google.common.collect.Collections2;
import com.google.common.collect.Sets;
@@ -38,7 +39,6 @@ import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.concurrent.RefCounted;
import org.apache.cassandra.utils.memory.HeapAllocator;
import org.apache.cassandra.utils.Pair;
@@ -71,7 +71,7 @@ public abstract class SSTable
protected SSTable(Descriptor descriptor, CFMetaData metadata, IPartitioner partitioner)
{
- this(descriptor, new HashSet<Component>(), metadata, partitioner);
+ this(descriptor, new HashSet<>(), metadata, partitioner);
}
protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner)
@@ -113,7 +113,9 @@ public abstract class SSTable
FileUtils.deleteWithConfirm(desc.filenameFor(component));
}
- FileUtils.delete(desc.filenameFor(Component.SUMMARY));
+
+ if (components.contains(Component.SUMMARY))
+ FileUtils.delete(desc.filenameFor(Component.SUMMARY));
logger.debug("Deleted {}", desc);
return true;
@@ -150,6 +152,15 @@ public abstract class SSTable
return descriptor.ksname;
}
+ @VisibleForTesting
+ public List<String> getAllFilePaths()
+ {
+ List<String> ret = new ArrayList<>();
+ for (Component component : components)
+ ret.add(descriptor.filenameFor(component));
+ return ret;
+ }
+
/**
* @return Descriptor and Component pair. null if given file is not acceptable as SSTable component.
* If component is of unknown type, returns CUSTOM component.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
deleted file mode 100644
index f0eb67f..0000000
--- a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.sstable;
-
-import java.io.File;
-import java.util.Collections;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.codahale.metrics.Counter;
-import org.apache.cassandra.concurrent.ScheduledExecutors;
-import org.apache.cassandra.db.lifecycle.Tracker;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.Blocker;
-
-public class SSTableDeletingTask implements Runnable
-{
- private static final Logger logger = LoggerFactory.getLogger(SSTableDeletingTask.class);
-
- // Deleting sstables is tricky because the mmapping might not have been finalized yet,
- // and delete will fail (on Windows) until it is (we only force the unmapping on SUN VMs).
- // Additionally, we need to make sure to delete the data file first, so on restart the others
- // will be recognized as GCable.
- private static final Queue<SSTableDeletingTask> failedTasks = new ConcurrentLinkedQueue<>();
- private static final Blocker blocker = new Blocker();
-
- private final Descriptor desc;
- private final Set<Component> components;
- private final long bytesOnDisk;
- private final Counter totalDiskSpaceUsed;
-
- /**
- * realDescriptor is the actual descriptor for the sstable, the descriptor inside
- * referent can be 'faked' as FINAL for early opened files. We need the real one
- * to be able to remove the files.
- */
- public SSTableDeletingTask(Descriptor realDescriptor, Set<Component> components, Counter totalDiskSpaceUsed, long bytesOnDisk)
- {
- this.desc = realDescriptor;
- this.bytesOnDisk = bytesOnDisk;
- this.totalDiskSpaceUsed = totalDiskSpaceUsed;
- switch (desc.type)
- {
- case FINAL:
- this.components = components;
- break;
- case TEMPLINK:
- this.components = Sets.newHashSet(Component.DATA, Component.PRIMARY_INDEX);
- break;
- default:
- throw new IllegalStateException();
- }
- }
-
- public void schedule()
- {
- ScheduledExecutors.nonPeriodicTasks.submit(this);
- }
-
- public void run()
- {
- blocker.ask();
- // If we can't successfully delete the DATA component, set the task to be retried later: see above
- File datafile = new File(desc.filenameFor(Component.DATA));
- if (!datafile.delete())
- {
- logger.error("Unable to delete {} (it will be removed on server restart; we'll also retry after GC)", datafile);
- failedTasks.add(this);
- return;
- }
- // let the remainder be cleaned up by delete
- SSTable.delete(desc, Sets.difference(components, Collections.singleton(Component.DATA)));
- if (totalDiskSpaceUsed != null)
- totalDiskSpaceUsed.dec(bytesOnDisk);
- }
-
- /**
- * Retry all deletions that failed the first time around (presumably b/c the sstable was still mmap'd.)
- * Useful because there are times when we know GC has been invoked; also exposed as an mbean.
- */
- public static void rescheduleFailedTasks()
- {
- SSTableDeletingTask task;
- while ( null != (task = failedTasks.poll()))
- task.schedule();
- }
-
- /** for tests */
- @VisibleForTesting
- public static void waitForDeletions()
- {
- Runnable runnable = new Runnable()
- {
- public void run()
- {
- }
- };
-
- FBUtilities.waitOnFuture(ScheduledExecutors.nonPeriodicTasks.schedule(runnable, 0, TimeUnit.MILLISECONDS));
- }
-
- @VisibleForTesting
- public static void pauseDeletions(boolean stop)
- {
- blocker.block(stop);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index b99003b..f25d3ff 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Multimap;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -78,13 +79,17 @@ public class SSTableLoader implements StreamEventHandler
directory.list(new FilenameFilter()
{
+ final Map<File, Set<File>> allTemporaryFiles = new HashMap<>();
public boolean accept(File dir, String name)
{
- if (new File(dir, name).isDirectory())
+ File file = new File(dir, name);
+
+ if (file.isDirectory())
return false;
+
Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(dir, name);
Descriptor desc = p == null ? null : p.left;
- if (p == null || !p.right.equals(Component.DATA) || desc.type.isTemporary)
+ if (p == null || !p.right.equals(Component.DATA))
return false;
if (!new File(desc.filenameFor(Component.PRIMARY_INDEX)).exists())
@@ -100,6 +105,19 @@ public class SSTableLoader implements StreamEventHandler
return false;
}
+ Set<File> temporaryFiles = allTemporaryFiles.get(dir);
+ if (temporaryFiles == null)
+ {
+ temporaryFiles = LifecycleTransaction.getTemporaryFiles(metadata, dir);
+ allTemporaryFiles.put(dir, temporaryFiles);
+ }
+
+ if (temporaryFiles.contains(file))
+ {
+ outputHandler.output(String.format("Skipping temporary file %s", name));
+ return false;
+ }
+
Set<Component> components = new HashSet<>();
components.add(Component.DATA);
components.add(Component.PRIMARY_INDEX);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 8580644..4eebb0c 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -17,11 +17,9 @@
*/
package org.apache.cassandra.io.sstable;
-import java.io.File;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.Runnables;
import org.apache.cassandra.cache.InstrumentingCache;
import org.apache.cassandra.cache.KeyCacheKey;
@@ -69,6 +67,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
private final List<SSTableWriter> writers = new ArrayList<>();
private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of Tracker)
+ private boolean keepOriginals; // true if we do not want to obsolete the originals
private SSTableWriter writer;
private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
@@ -95,9 +94,16 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
this.cfs = cfs;
this.maxAge = maxAge;
this.isOffline = isOffline;
+ this.keepOriginals = false;
this.preemptiveOpenInterval = preemptiveOpenInterval;
}
+ public SSTableRewriter keepOriginals(boolean val)
+ {
+ keepOriginals = val;
+ return this;
+ }
+
private static long calculateOpenInterval(boolean shouldOpenEarly)
{
long interval = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20);
@@ -189,6 +195,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
{
for (SSTableWriter writer : writers)
accumulate = writer.commit(accumulate);
+
accumulate = transaction.commit(accumulate);
return accumulate;
}
@@ -280,17 +287,19 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
if (writer != null)
{
writer.abort();
+
+ transaction.untrackNew(writer);
writers.remove(writer);
}
writer = newWriter;
+
return;
}
- SSTableReader reader = null;
if (preemptiveOpenInterval != Long.MAX_VALUE)
{
// we leave it as a tmp file, but we open it and add it to the Tracker
- reader = writer.setMaxDataAge(maxAge).openFinalEarly();
+ SSTableReader reader = writer.setMaxDataAge(maxAge).openFinalEarly();
transaction.update(reader, false);
moveStarts(reader, reader.last);
transaction.checkpoint();
@@ -357,8 +366,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
if (throwLate)
throw new RuntimeException("exception thrown after all sstables finished, for testing");
- // TODO: do we always want to avoid obsoleting if offline?
- if (!isOffline)
+ if (!keepOriginals)
transaction.obsoleteOriginals();
transaction.prepareToCommit();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index db6ed42..a70b92f 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -34,7 +34,6 @@ import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.UnfilteredSerializer;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.utils.JVMStabilityInspector;
/**
@@ -131,11 +130,13 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
try
{
diskWriter.join();
+ checkForWriterException();
}
- catch (InterruptedException e)
+ catch (Throwable e)
{
throw new RuntimeException(e);
}
+
checkForWriterException();
}
@@ -203,7 +204,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
if (b == SENTINEL)
return;
- try (SSTableWriter writer = createWriter())
+ try (SSTableTxnWriter writer = createWriter())
{
for (Map.Entry<DecoratedKey, PartitionUpdate> entry : b.entrySet())
writer.append(entry.getValue().unfilteredIterator());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
index 0b06405..b22a048 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
@@ -26,7 +26,6 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
/**
* A SSTable writer that assumes rows are in (partitioner) sorted order.
@@ -43,14 +42,14 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
protected DecoratedKey currentKey;
protected PartitionUpdate update;
- private SSTableWriter writer;
+ private SSTableTxnWriter writer;
protected SSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner, PartitionColumns columns)
{
super(directory, metadata, partitioner, columns);
}
- private SSTableWriter getOrCreateWriter()
+ private SSTableTxnWriter getOrCreateWriter()
{
if (writer == null)
writer = createWriter();