You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/05/22 10:46:34 UTC
[3/7] cassandra git commit: Extend Transactional API to sstable
lifecycle management
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index a526ec9..8029075 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -20,33 +20,28 @@ package org.apache.cassandra.io.sstable;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DataTracker;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.utils.CLibrary;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.Refs;
import org.apache.cassandra.utils.concurrent.Transactional;
-import static org.apache.cassandra.utils.Throwables.merge;
-
/**
* Wraps one or more writers as output for rewriting one or more readers: every sstable_preemptive_open_interval_in_mb
* we look in the summary we're collecting for the latest writer for the penultimate key that we know to have been fully
* flushed to the index file, and then double check that the key is fully present in the flushed data file.
- * Then we move the starts of each reader forwards to that point, replace them in the datatracker, and attach a runnable
+ * Then we move the starts of each reader forwards to that point, replace them in the Tracker, and attach a runnable
* for on-close (i.e. when all references expire) that drops the page cache prior to that key position
*
* hard-links are created for each partially written sstable so that readers opened against them continue to work past
* the rename of the temporary file, which is deleted once all readers against the hard-link have been closed.
- * If for any reason the writer is rolled over, we immediately rename and fully expose the completed file in the DataTracker.
+ * If for any reason the writer is rolled over, we immediately rename and fully expose the completed file in the Tracker.
*
* On abort we restore the original lower bounds to the existing readers and delete any temporary files we had in progress,
* but leave any hard-links in place for the readers we opened to cleanup when they're finished as we would had we finished
@@ -74,26 +69,19 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
return preemptiveOpenInterval;
}
- private final DataTracker dataTracker;
private final ColumnFamilyStore cfs;
private final long maxAge;
private long repairedAt = -1;
// the set of final readers we will expose on commit
+ private final LifecycleTransaction transaction; // the readers we are rewriting (updated as they are replaced)
private final List<SSTableReader> preparedForCommit = new ArrayList<>();
- private final Set<SSTableReader> rewriting; // the readers we are rewriting (updated as they are replaced)
- private final Map<Descriptor, DecoratedKey> originalStarts = new HashMap<>(); // the start key for each reader we are rewriting
private final Map<Descriptor, Integer> fileDescriptors = new HashMap<>(); // the file descriptors for each reader descriptor we are rewriting
- private SSTableReader currentlyOpenedEarly; // the reader for the most recent (re)opening of the target file
private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at
- private final List<Finished> finishedWriters = new ArrayList<>();
- // as writers are closed from finishedWriters, their last readers are moved into discard, so that abort can cleanup
- // after us safely; we use a set so we can add in both prepareToCommit and abort
- private final Set<SSTableReader> discard = new HashSet<>();
- // true for operations that are performed without Cassandra running (prevents updates of DataTracker)
- private final boolean isOffline;
+ private final List<SSTableWriter> writers = new ArrayList<>();
+ private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of Tracker)
private SSTableWriter writer;
private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
@@ -101,15 +89,11 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
// for testing (TODO: remove when have byteman setup)
private boolean throwEarly, throwLate;
- public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline)
+ public SSTableRewriter(ColumnFamilyStore cfs, LifecycleTransaction transaction, long maxAge, boolean isOffline)
{
- this.rewriting = rewriting;
- for (SSTableReader sstable : rewriting)
- {
- originalStarts.put(sstable.descriptor, sstable.first);
+ this.transaction = transaction;
+ for (SSTableReader sstable : this.transaction.originals())
fileDescriptors.put(sstable.descriptor, CLibrary.getfd(sstable.getFilename()));
- }
- this.dataTracker = cfs.getDataTracker();
this.cfs = cfs;
this.maxAge = maxAge;
this.isOffline = isOffline;
@@ -134,7 +118,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
else
{
boolean save = false;
- for (SSTableReader reader : rewriting)
+ for (SSTableReader reader : transaction.originals())
{
if (reader.getCachedPosition(row.key, false) != null)
{
@@ -170,7 +154,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
{
if (isOffline)
{
- for (SSTableReader reader : rewriting)
+ for (SSTableReader reader : transaction.originals())
{
RowIndexEntry index = reader.getPosition(key, SSTableReader.Operator.GE);
CLibrary.trySkipCache(fileDescriptors.get(reader.descriptor), 0, index == null ? 0 : index.position);
@@ -181,10 +165,10 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
SSTableReader reader = writer.setMaxDataAge(maxAge).openEarly();
if (reader != null)
{
- replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
- currentlyOpenedEarly = reader;
+ transaction.update(reader, false);
currentlyOpenedEarlyAt = writer.getFilePointer();
- moveStarts(reader, reader.last, false);
+ moveStarts(reader, reader.last);
+ transaction.checkpoint();
}
}
}
@@ -192,59 +176,19 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
protected Throwable doAbort(Throwable accumulate)
{
- try
- {
- moveStarts(null, null, true);
- }
- catch (Throwable t)
- {
- accumulate = merge(accumulate, t);
- }
-
- // cleanup any sstables we prepared for commit
- for (SSTableReader sstable : preparedForCommit)
- {
- try
- {
- sstable.markObsolete();
- sstable.selfRef().release();
- }
- catch (Throwable t)
- {
- accumulate = merge(accumulate , t);
- }
- }
-
- // abort the writers, and add the early opened readers to our discard pile
-
- if (writer != null)
- finishedWriters.add(new Finished(writer, currentlyOpenedEarly));
-
- for (Finished finished : finishedWriters)
- {
- accumulate = finished.writer.abort(accumulate);
-
- // if we've already been opened, add ourselves to the discard pile
- if (finished.reader != null)
- discard.add(finished.reader);
- }
-
- accumulate = replaceWithFinishedReaders(Collections.<SSTableReader>emptyList(), accumulate);
+ // abort the writers
+ for (SSTableWriter writer : writers)
+ accumulate = writer.abort(accumulate);
+ // abort the lifecycle transaction
+ accumulate = transaction.abort(accumulate);
return accumulate;
}
protected Throwable doCommit(Throwable accumulate)
{
- for (Finished f : finishedWriters)
- accumulate = f.writer.commit(accumulate);
- accumulate = replaceWithFinishedReaders(preparedForCommit, accumulate);
-
- return accumulate;
- }
-
- protected Throwable doCleanup(Throwable accumulate)
- {
- // we have no state of our own to cleanup; Transactional objects cleanup their own state in abort or commit
+ for (SSTableWriter writer : writers)
+ accumulate = writer.commit(accumulate);
+ accumulate = transaction.commit(accumulate);
return accumulate;
}
@@ -260,100 +204,70 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
*
* @param newReader the rewritten reader that replaces them for this region
* @param lowerbound if !reset, must be non-null, and marks the exclusive lowerbound of the start for each sstable
- * @param reset true iff we are restoring earlier starts (increasing the range over which they are valid)
*/
- private void moveStarts(SSTableReader newReader, DecoratedKey lowerbound, boolean reset)
+ private void moveStarts(SSTableReader newReader, DecoratedKey lowerbound)
{
if (isOffline)
return;
if (preemptiveOpenInterval == Long.MAX_VALUE)
return;
- List<SSTableReader> toReplace = new ArrayList<>();
- List<SSTableReader> replaceWith = new ArrayList<>();
final List<DecoratedKey> invalidateKeys = new ArrayList<>();
- if (!reset)
- {
- invalidateKeys.addAll(cachedKeys.keySet());
- for (Map.Entry<DecoratedKey, RowIndexEntry> cacheKey : cachedKeys.entrySet())
- newReader.cacheKey(cacheKey.getKey(), cacheKey.getValue());
- }
+ invalidateKeys.addAll(cachedKeys.keySet());
+ for (Map.Entry<DecoratedKey, RowIndexEntry> cacheKey : cachedKeys.entrySet())
+ newReader.cacheKey(cacheKey.getKey(), cacheKey.getValue());
cachedKeys = new HashMap<>();
- for (SSTableReader sstable : ImmutableList.copyOf(rewriting))
+ for (SSTableReader sstable : transaction.originals())
{
// we call getCurrentReplacement() to support multiple rewriters operating over the same source readers at once.
// note: only one such writer should be written to at any moment
- final SSTableReader latest = sstable.getCurrentReplacement();
- SSTableReader replacement;
- if (reset)
- {
- DecoratedKey newStart = originalStarts.get(sstable.descriptor);
- replacement = latest.cloneWithNewStart(newStart, null);
- }
- else
- {
- // skip any sstables that we know to already be shadowed
- if (latest.openReason == SSTableReader.OpenReason.SHADOWED)
- continue;
- if (latest.first.compareTo(lowerbound) > 0)
- continue;
+ final SSTableReader latest = transaction.current(sstable);
- final Runnable runOnClose = new Runnable()
- {
- public void run()
- {
- // this is somewhat racey, in that we could theoretically be closing this old reader
- // when an even older reader is still in use, but it's not likely to have any major impact
- for (DecoratedKey key : invalidateKeys)
- latest.invalidateCacheKey(key);
- }
- };
+ // skip any sstables that we know to already be shadowed
+ if (latest.first.compareTo(lowerbound) > 0)
+ continue;
- if (lowerbound.compareTo(latest.last) >= 0)
+ final Runnable runOnClose = new Runnable()
+ {
+ public void run()
{
- replacement = latest.cloneAsShadowed(runOnClose);
+ // this is somewhat racey, in that we could theoretically be closing this old reader
+ // when an even older reader is still in use, but it's not likely to have any major impact
+ for (DecoratedKey key : invalidateKeys)
+ latest.invalidateCacheKey(key);
}
- else
+ };
+
+ if (lowerbound.compareTo(latest.last) >= 0)
+ {
+ if (!transaction.isObsolete(latest))
{
- DecoratedKey newStart = latest.firstKeyBeyond(lowerbound);
- assert newStart != null;
- replacement = latest.cloneWithNewStart(newStart, runOnClose);
+ latest.runOnClose(runOnClose);
+ transaction.obsolete(latest);
}
+ continue;
}
- toReplace.add(latest);
- replaceWith.add(replacement);
- rewriting.remove(sstable);
- rewriting.add(replacement);
+ DecoratedKey newStart = latest.firstKeyBeyond(lowerbound);
+ assert newStart != null;
+ SSTableReader replacement = latest.cloneWithNewStart(newStart, runOnClose);
+ transaction.update(replacement, true);
}
- cfs.getDataTracker().replaceWithNewInstances(toReplace, replaceWith);
- }
-
- private void replaceEarlyOpenedFile(SSTableReader toReplace, SSTableReader replaceWith)
- {
- if (isOffline)
- return;
- Set<SSTableReader> toReplaceSet;
- if (toReplace != null)
- {
- toReplace.setReplacedBy(replaceWith);
- toReplaceSet = Collections.singleton(toReplace);
- }
- else
- {
- dataTracker.markCompacting(Collections.singleton(replaceWith), true, isOffline);
- toReplaceSet = Collections.emptySet();
- }
- dataTracker.replaceEarlyOpenedFiles(toReplaceSet, Collections.singleton(replaceWith));
}
public void switchWriter(SSTableWriter newWriter)
{
+ if (newWriter != null)
+ writers.add(newWriter.setMaxDataAge(maxAge));
+
if (writer == null || writer.getFilePointer() == 0)
{
if (writer != null)
+ {
writer.abort();
+ writers.remove(writer);
+ }
writer = newWriter;
return;
}
@@ -361,14 +275,13 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
SSTableReader reader = null;
if (preemptiveOpenInterval != Long.MAX_VALUE)
{
- // we leave it as a tmp file, but we open it and add it to the dataTracker
+ // we leave it as a tmp file, but we open it and add it to the Tracker
reader = writer.setMaxDataAge(maxAge).openFinalEarly();
- replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
- moveStarts(reader, reader.last, false);
+ transaction.update(reader, false);
+ moveStarts(reader, reader.last);
+ transaction.checkpoint();
}
- finishedWriters.add(new Finished(writer, reader));
- currentlyOpenedEarly = null;
currentlyOpenedEarlyAt = 0;
writer = newWriter;
}
@@ -387,12 +300,12 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
/**
* Finishes the new file(s)
*
- * Creates final files, adds the new files to the dataTracker (via replaceReader).
+ * Creates final files, adds the new files to the Tracker (via replaceReader).
*
* We add them to the tracker to be able to get rid of the tmpfiles
*
* It is up to the caller to do the compacted sstables replacement
- * gymnastics (ie, call DataTracker#markCompactedSSTablesReplaced(..))
+ * gymnastics (ie, call Tracker#markCompactedSSTablesReplaced(..))
*
*
*/
@@ -402,6 +315,7 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
return finished();
}
+ // returns, in list form, the
public List<SSTableReader> finished()
{
assert state() == State.COMMITTED || state() == State.READY_TO_COMMIT;
@@ -416,82 +330,31 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
throw new RuntimeException("exception thrown early in finish, for testing");
// No early open to finalize and replace
- for (Finished f : finishedWriters)
+ for (SSTableWriter writer : writers)
{
- if (f.reader != null)
- discard.add(f.reader);
-
- f.writer.setRepairedAt(repairedAt).setMaxDataAge(maxAge).setOpenResult(true).prepareToCommit();
- SSTableReader newReader = f.writer.finished();
-
- if (f.reader != null)
- f.reader.setReplacedBy(newReader);
-
- preparedForCommit.add(newReader);
+ assert writer.getFilePointer() > 0;
+ writer.setRepairedAt(repairedAt).setOpenResult(true).prepareToCommit();
+ SSTableReader reader = writer.finished();
+ transaction.update(reader, false);
+ preparedForCommit.add(reader);
}
+ transaction.checkpoint();
if (throwLate)
throw new RuntimeException("exception thrown after all sstables finished, for testing");
- }
- @VisibleForTesting
- void throwDuringPrepare(boolean throwEarly)
- {
- this.throwEarly = throwEarly;
- this.throwLate = !throwEarly;
- }
+ // TODO: do we always want to avoid obsoleting if offline?
+ if (!isOffline)
+ transaction.obsoleteOriginals();
- // cleanup all our temporary readers and swap in our new ones
- private Throwable replaceWithFinishedReaders(List<SSTableReader> finished, Throwable accumulate)
- {
- if (isOffline)
- {
- for (SSTableReader reader : discard)
- {
- try
- {
- if (reader.getCurrentReplacement() == reader)
- reader.markObsolete();
- }
- catch (Throwable t)
- {
- accumulate = merge(accumulate, t);
- }
- }
- accumulate = Refs.release(Refs.selfRefs(discard), accumulate);
- }
- else
- {
- try
- {
- dataTracker.replaceEarlyOpenedFiles(discard, finished);
- }
- catch (Throwable t)
- {
- accumulate = merge(accumulate, t);
- }
- try
- {
- dataTracker.unmarkCompacting(discard);
- }
- catch (Throwable t)
- {
- accumulate = merge(accumulate, t);
- }
- }
- discard.clear();
- return accumulate;
+ transaction.prepareToCommit();
}
- private static final class Finished
+ public void throwDuringPrepare(boolean earlyException)
{
- final SSTableWriter writer;
- final SSTableReader reader;
-
- private Finished(SSTableWriter writer, SSTableReader reader)
- {
- this.writer = writer;
- this.reader = reader;
- }
+ if (earlyException)
+ throwEarly = true;
+ else
+ throwLate = true;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 23c27b0..8e701b3 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.filter.ColumnSlice;
import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.lifecycle.Tracker;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.*;
@@ -122,7 +123,7 @@ import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR
* managed as another resource each instance tracks its own Ref instance to, to ensure all of these resources are
* cleaned up safely and can be debugged otherwise.
*
- * TODO: fill in details about DataTracker and lifecycle interactions for tools, and for compaction strategies
+ * TODO: fill in details about Tracker and lifecycle interactions for tools, and for compaction strategies
*/
public abstract class SSTableReader extends SSTable implements SelfRefCounted<SSTableReader>
{
@@ -141,6 +142,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
}
};
+ // it's just an object, which we use regular Object equality on; we introduce a special class just for easy recognition
+ public static final class UniqueIdentifier {}
+
public static final Comparator<SSTableReader> sstableComparator = new Comparator<SSTableReader>()
{
public int compare(SSTableReader o1, SSTableReader o2)
@@ -170,11 +174,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
NORMAL,
EARLY,
METADATA_CHANGE,
- MOVED_START,
- SHADOWED // => MOVED_START past end
+ MOVED_START
}
public final OpenReason openReason;
+ public final UniqueIdentifier instanceId = new UniqueIdentifier();
// indexfile and datafile: might be null before a call to load()
protected SegmentedFile ifile;
@@ -594,9 +598,22 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
return ifile.path();
}
- public void setTrackedBy(DataTracker tracker)
+ // this is only used for restoring tracker state at delete (and wiring up the keycache) and so
+ // should only be called once it is actually added to the tracker
+ public void setupDeleteNotification(Tracker tracker)
{
tidy.type.deletingTask.setTracker(tracker);
+ setupKeyCache();
+ }
+
+ @VisibleForTesting
+ public boolean isDeleteNotificationSetup()
+ {
+ return tidy.type.deletingTask.getTracker() != null;
+ }
+
+ public void setupKeyCache()
+ {
// under normal operation we can do this at any time, but SSTR is also used outside C* proper,
// e.g. by BulkLoader, which does not initialize the cache. As a kludge, we set up the cache
// here when we know we're being wired into the rest of the server infrastructure.
@@ -908,15 +925,38 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
}
}
- public void setReplacedBy(SSTableReader replacement)
+ public void setReplaced()
{
synchronized (tidy.global)
{
- assert replacement != null;
assert !tidy.isReplaced;
- assert tidy.global.live == this;
tidy.isReplaced = true;
- tidy.global.live = replacement;
+ }
+ }
+
+ public boolean isReplaced()
+ {
+ synchronized (tidy.global)
+ {
+ return tidy.isReplaced;
+ }
+ }
+
+ public void runOnClose(final Runnable runOnClose)
+ {
+ synchronized (tidy.global)
+ {
+ final Runnable existing = tidy.runOnClose;
+ tidy.runOnClose = existing == null
+ ? runOnClose
+ : new Runnable()
+ {
+ public void run()
+ {
+ existing.run();
+ runOnClose.run();
+ }
+ };
}
}
@@ -948,32 +988,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
replacement.first = newStart;
replacement.last = this.last;
- setReplacedBy(replacement);
- return replacement;
- }
- }
-
- public SSTableReader cloneAsShadowed(final Runnable runOnClose)
- {
- synchronized (tidy.global)
- {
- assert openReason != OpenReason.EARLY;
- this.tidy.runOnClose = new Runnable()
- {
- public void run()
- {
- dfile.dropPageCache(0);
- ifile.dropPageCache(0);
- runOnClose.run();
- }
- };
-
- SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile.sharedCopy(),
- dfile.sharedCopy(), indexSummary.sharedCopy(), bf.sharedCopy(),
- maxDataAge, sstableMetadata, OpenReason.SHADOWED);
- replacement.first = first;
- replacement.last = last;
- setReplacedBy(replacement);
return replacement;
}
}
@@ -1036,7 +1050,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
sstableMetadata, OpenReason.METADATA_CHANGE);
replacement.first = this.first;
replacement.last = this.last;
- setReplacedBy(replacement);
return replacement;
}
}
@@ -1520,7 +1533,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
* except for threads holding a reference.
*
* @return true if the this is the first time the file was marked obsolete. Calling this
- * multiple times is usually buggy (see exceptions in DataTracker.unmarkCompacting and removeOldSSTablesSize).
+ * multiple times is usually buggy (see exceptions in Tracker.unmarkCompacting and removeOldSSTablesSize).
*/
public boolean markObsolete()
{
@@ -1638,11 +1651,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
}
- public SSTableReader getCurrentReplacement()
- {
- return tidy.global.live;
- }
-
/**
* TODO: Move someplace reusable
*/
@@ -2048,8 +2056,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
static final ConcurrentMap<Descriptor, Ref<GlobalTidy>> lookup = new ConcurrentHashMap<>();
private final Descriptor desc;
- // a single convenience property for getting the most recent version of an sstable, not related to tidying
- private SSTableReader live;
// the readMeter that is shared between all instances of the sstable, and can be overridden in all of them
// at once also, for testing purposes
private RestorableMeter readMeter;
@@ -2064,7 +2070,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
{
this.desc = reader.descriptor;
this.isCompacted = new AtomicBoolean();
- this.live = reader;
}
void ensureReadMeter()
@@ -2128,6 +2133,13 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
}
}
+ @VisibleForTesting
+ public static void resetTidying()
+ {
+ GlobalTidy.lookup.clear();
+ DescriptorTypeTidy.lookup.clear();
+ }
+
public static abstract class Factory
{
public abstract SSTableReader open(final Descriptor descriptor,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index fa17c20..a7a7fcc 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -377,7 +377,8 @@ public class BigTableWriter extends SSTableWriter
return accumulate;
}
- protected Throwable doCleanup(Throwable accumulate)
+ @Override
+ protected Throwable doPreCleanup(Throwable accumulate)
{
accumulate = dbuilder.close(accumulate);
return accumulate;
@@ -562,7 +563,8 @@ public class BigTableWriter extends SSTableWriter
return indexFile.abort(accumulate);
}
- protected Throwable doCleanup(Throwable accumulate)
+ @Override
+ protected Throwable doPreCleanup(Throwable accumulate)
{
accumulate = summary.close(accumulate);
accumulate = bf.close(accumulate);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index d63be31..3c35a34 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -77,7 +77,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
protected class TransactionalProxy extends AbstractTransactional
{
@Override
- protected Throwable doCleanup(Throwable accumulate)
+ protected Throwable doPreCleanup(Throwable accumulate)
{
if (directoryFD >= 0)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
index 842d06d..4ab4446 100644
--- a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
@@ -215,28 +215,28 @@ public class ColumnFamilyMetrics
{
public Long getValue()
{
- return cfs.getDataTracker().getView().getCurrentMemtable().getOperations();
+ return cfs.getTracker().getView().getCurrentMemtable().getOperations();
}
});
memtableOnHeapSize = createColumnFamilyGauge("MemtableOnHeapSize", new Gauge<Long>()
{
public Long getValue()
{
- return cfs.getDataTracker().getView().getCurrentMemtable().getAllocator().onHeap().owns();
+ return cfs.getTracker().getView().getCurrentMemtable().getAllocator().onHeap().owns();
}
});
memtableOffHeapSize = createColumnFamilyGauge("MemtableOffHeapSize", new Gauge<Long>()
{
public Long getValue()
{
- return cfs.getDataTracker().getView().getCurrentMemtable().getAllocator().offHeap().owns();
+ return cfs.getTracker().getView().getCurrentMemtable().getAllocator().offHeap().owns();
}
});
memtableLiveDataSize = createColumnFamilyGauge("MemtableLiveDataSize", new Gauge<Long>()
{
public Long getValue()
{
- return cfs.getDataTracker().getView().getCurrentMemtable().getLiveDataSize();
+ return cfs.getTracker().getView().getCurrentMemtable().getLiveDataSize();
}
});
allMemtablesOnHeapSize = createColumnFamilyGauge("AllMemtablesHeapSize", new Gauge<Long>()
@@ -245,7 +245,7 @@ public class ColumnFamilyMetrics
{
long size = 0;
for (ColumnFamilyStore cfs2 : cfs.concatWithIndexes())
- size += cfs2.getDataTracker().getView().getCurrentMemtable().getAllocator().onHeap().owns();
+ size += cfs2.getTracker().getView().getCurrentMemtable().getAllocator().onHeap().owns();
return size;
}
});
@@ -255,7 +255,7 @@ public class ColumnFamilyMetrics
{
long size = 0;
for (ColumnFamilyStore cfs2 : cfs.concatWithIndexes())
- size += cfs2.getDataTracker().getView().getCurrentMemtable().getAllocator().offHeap().owns();
+ size += cfs2.getTracker().getView().getCurrentMemtable().getAllocator().offHeap().owns();
return size;
}
});
@@ -265,7 +265,7 @@ public class ColumnFamilyMetrics
{
long size = 0;
for (ColumnFamilyStore cfs2 : cfs.concatWithIndexes())
- size += cfs2.getDataTracker().getView().getCurrentMemtable().getLiveDataSize();
+ size += cfs2.getTracker().getView().getCurrentMemtable().getLiveDataSize();
return size;
}
});
@@ -288,7 +288,7 @@ public class ColumnFamilyMetrics
public Long getValue()
{
long memtablePartitions = 0;
- for (Memtable memtable : cfs.getDataTracker().getView().getAllMemtables())
+ for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
memtablePartitions += memtable.partitionCount();
return SSTableReader.getApproximateKeyCount(cfs.getSSTables()) + memtablePartitions;
}
@@ -358,7 +358,7 @@ public class ColumnFamilyMetrics
{
public Integer getValue()
{
- return cfs.getDataTracker().getSSTables().size();
+ return cfs.getTracker().getSSTables().size();
}
});
liveDiskSpaceUsed = createColumnFamilyCounter("LiveDiskSpaceUsed");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 6a70692..44522db 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -28,13 +28,14 @@ import javax.annotation.Nullable;
import com.google.common.base.Function;
import com.google.common.collect.*;
+
+import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DataTracker;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.dht.AbstractBounds;
@@ -318,9 +319,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
final List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
for (Range<Token> range : ranges)
rowBoundsList.add(Range.makeRowRange(range));
- refs.addAll(cfStore.selectAndReference(new Function<DataTracker.View, List<SSTableReader>>()
+ refs.addAll(cfStore.selectAndReference(new Function<View, List<SSTableReader>>()
{
- public List<SSTableReader> apply(DataTracker.View view)
+ public List<SSTableReader> apply(View view)
{
List<SSTableReader> filteredSSTables = ColumnFamilyStore.CANONICAL_SSTABLES.apply(view);
Set<SSTableReader> sstables = Sets.newHashSet();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index 9f26637..d32ef88 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -32,12 +32,8 @@ import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
-import org.apache.cassandra.db.compaction.LeveledManifest;
-import org.apache.cassandra.db.compaction.Scrubber;
-import org.apache.cassandra.db.compaction.WrappingCompactionStrategy;
+import org.apache.cassandra.db.compaction.*;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -121,9 +117,9 @@ public class StandaloneScrubber
{
for (SSTableReader sstable : sstables)
{
- try
+ try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.SCRUB, sstable))
{
- Scrubber scrubber = new Scrubber(cfs, sstable, options.skipCorrupted, handler, true, !options.noValidate);
+ Scrubber scrubber = new Scrubber(cfs, txn, options.skipCorrupted, handler, true, !options.noValidate);
try
{
scrubber.scrub();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
index 2541d6e..e881133 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
@@ -27,10 +27,12 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.commons.cli.*;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.SSTableSplitter;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
@@ -145,12 +147,11 @@ public class StandaloneSplitter
if (options.snapshot)
System.out.println(String.format("Pre-split sstables snapshotted into snapshot %s", snapshotName));
- cfs.getDataTracker().markCompacting(sstables, false, true);
for (SSTableReader sstable : sstables)
{
- try
+ try (LifecycleTransaction transaction = LifecycleTransaction.offline(OperationType.UNKNOWN, sstable))
{
- new SSTableSplitter(cfs, sstable, options.sizeInMB).split();
+ new SSTableSplitter(cfs, transaction, options.sizeInMB).split();
// Remove the sstable (it's been copied by split and snapshotted)
sstable.markObsolete();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
index 409a5f0..626d429 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
@@ -29,7 +29,9 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.compaction.Upgrader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.OutputHandler;
@@ -98,9 +100,9 @@ public class StandaloneUpgrader
for (SSTableReader sstable : readers)
{
- try
+ try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.UPGRADE_SSTABLES, sstable))
{
- Upgrader upgrader = new Upgrader(cfs, sstable, handler);
+ Upgrader upgrader = new Upgrader(cfs, txn, handler);
upgrader.upgrade();
if (!options.keepSource)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/utils/concurrent/Blocker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Blocker.java b/src/java/org/apache/cassandra/utils/concurrent/Blocker.java
new file mode 100644
index 0000000..5192e98
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/Blocker.java
@@ -0,0 +1,63 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class Blocker
+{
+ private final ReentrantLock lock = new ReentrantLock();
+ private final Condition unblocked = lock.newCondition();
+ private volatile boolean block = false;
+
+ public void block(boolean block)
+ {
+ this.block = block;
+ if (!block)
+ {
+ lock.lock();
+ try
+ {
+ unblocked.signalAll();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+ }
+
+ public void ask()
+ {
+ if (block)
+ {
+ lock.lock();
+ try
+ {
+ while (block)
+ unblocked.awaitUninterruptibly();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
index bcf5095..5b0eb8e 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
@@ -70,6 +70,7 @@ public interface Transactional extends AutoCloseable
ABORTED;
}
+ private boolean permitRedundantTransitions;
private State state = State.IN_PROGRESS;
// the methods for actually performing the necessary behaviours, that are themselves protected against
@@ -79,9 +80,18 @@ public interface Transactional extends AutoCloseable
protected abstract Throwable doCommit(Throwable accumulate);
protected abstract Throwable doAbort(Throwable accumulate);
- // this only needs to perform cleanup of state unique to this instance; any internal
+ // these only needs to perform cleanup of state unique to this instance; any internal
// Transactional objects will perform cleanup in the commit() or abort() calls
- protected abstract Throwable doCleanup(Throwable accumulate);
+
+ /**
+ * perform an exception-safe pre-abort cleanup; this will still be run *after* commit
+ */
+ protected Throwable doPreCleanup(Throwable accumulate){ return accumulate; }
+
+ /**
+ * perform an exception-safe post-abort cleanup
+ */
+ protected Throwable doPostCleanup(Throwable accumulate){ return accumulate; }
/**
* Do any preparatory work prior to commit. This method should throw any exceptions that can be encountered
@@ -94,10 +104,13 @@ public interface Transactional extends AutoCloseable
*/
public final Throwable commit(Throwable accumulate)
{
+ if (permitRedundantTransitions && state == State.COMMITTED)
+ return accumulate;
if (state != State.READY_TO_COMMIT)
- throw new IllegalStateException("Commit attempted before prepared to commit");
+ throw new IllegalStateException("Cannot commit unless READY_TO_COMMIT; state is " + state);
accumulate = doCommit(accumulate);
- accumulate = doCleanup(accumulate);
+ accumulate = doPreCleanup(accumulate);
+ accumulate = doPostCleanup(accumulate);
state = State.COMMITTED;
return accumulate;
}
@@ -123,8 +136,9 @@ public interface Transactional extends AutoCloseable
}
state = State.ABORTED;
// we cleanup first so that, e.g., file handles can be released prior to deletion
- accumulate = doCleanup(accumulate);
+ accumulate = doPreCleanup(accumulate);
accumulate = doAbort(accumulate);
+ accumulate = doPostCleanup(accumulate);
return accumulate;
}
@@ -147,6 +161,8 @@ public interface Transactional extends AutoCloseable
*/
public final void prepareToCommit()
{
+ if (permitRedundantTransitions && state == State.READY_TO_COMMIT)
+ return;
if (state != State.IN_PROGRESS)
throw new IllegalStateException("Cannot prepare to commit unless IN_PROGRESS; state is " + state);
@@ -183,6 +199,11 @@ public interface Transactional extends AutoCloseable
{
return state;
}
+
+ protected void permitRedundantTransitions()
+ {
+ permitRedundantTransitions = true;
+ }
}
// commit should generally never throw an exception, and preferably never generate one,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
index bf71639..e6c8f56 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
@@ -18,9 +18,7 @@
*/
package org.apache.cassandra.db.compaction;
-import java.io.IOException;
import java.util.*;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -35,6 +33,7 @@ import org.apache.cassandra.Util;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.db.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.sstable.SSTableUtils;
import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -126,8 +125,11 @@ public class LongCompactionsTest
long start = System.nanoTime();
final int gcBefore = (int) (System.currentTimeMillis() / 1000) - Schema.instance.getCFMetaData(KEYSPACE1, "Standard1").getGcGraceSeconds();
- assert store.getDataTracker().markCompacting(sstables): "Cannot markCompacting all sstables";
- new CompactionTask(store, sstables, gcBefore, false).execute(null);
+ try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.COMPACTION))
+ {
+ assert txn != null : "Cannot markCompacting all sstables";
+ new CompactionTask(store, txn, gcBefore, false).execute(null);
+ }
System.out.println(String.format("%s: sstables=%d rowsper=%d colsper=%d: %d ms",
this.getClass().getName(),
sstableCount,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/MockSchema.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/MockSchema.java b/test/unit/org/apache/cassandra/MockSchema.java
new file mode 100644
index 0000000..bc236e1
--- /dev/null
+++ b/test/unit/org/apache/cassandra/MockSchema.java
@@ -0,0 +1,167 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.cache.CachingOptions;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.SimpleSparseCellNameType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.IndexSummary;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.BufferedSegmentedFile;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.Memory;
+import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.utils.AlwaysPresentFilter;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class MockSchema
+{
+ static
+ {
+ Memory offsets = Memory.allocate(4);
+ offsets.setInt(0, 0);
+ indexSummary = new IndexSummary(Murmur3Partitioner.instance, offsets, 0, Memory.allocate(4), 0, 0, 0, 1);
+ }
+ private static final AtomicInteger id = new AtomicInteger();
+ public static final Keyspace ks = Keyspace.mockKS(new KSMetaData("mockks", SimpleStrategy.class, ImmutableMap.of("replication_factor", "1"), false));
+ public static final ColumnFamilyStore cfs = newCFS();
+
+ private static final IndexSummary indexSummary;
+ private static final SegmentedFile segmentedFile = new BufferedSegmentedFile(new ChannelProxy(temp("mocksegmentedfile")), 0);
+
+ public static Memtable memtable()
+ {
+ return new Memtable(cfs.metadata);
+ }
+
+ public static SSTableReader sstable(int generation)
+ {
+ return sstable(generation, false);
+ }
+
+ public static SSTableReader sstable(int generation, boolean keepRef)
+ {
+ return sstable(generation, 0, keepRef);
+ }
+
+ public static SSTableReader sstable(int generation, int size)
+ {
+ return sstable(generation, size, false);
+ }
+
+ public static SSTableReader sstable(int generation, int size, boolean keepRef)
+ {
+ return sstable(generation, size, keepRef, cfs);
+ }
+ public static SSTableReader sstable(int generation, int size, boolean keepRef, ColumnFamilyStore cfs)
+ {
+ Descriptor descriptor = new Descriptor(temp("mockcfdir").getParentFile(), "mockks", "mockcf", generation, Descriptor.Type.FINAL);
+ Set<Component> components = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.TOC);
+ for (Component component : components)
+ {
+ File file = new File(descriptor.filenameFor(component));
+ try
+ {
+ file.createNewFile();
+ }
+ catch (IOException e)
+ {
+ }
+ file.deleteOnExit();
+ }
+ if (size > 0)
+ {
+ try
+ {
+ File file = new File(descriptor.filenameFor(Component.DATA));
+ try (RandomAccessFile raf = new RandomAccessFile(file, "rw"))
+ {
+ raf.setLength(size);
+ }
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator)
+ .finalizeMetadata(Murmur3Partitioner.instance.getClass().getCanonicalName(), 0.01f, -1)
+ .get(MetadataType.STATS);
+ SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata, Murmur3Partitioner.instance,
+ segmentedFile.sharedCopy(), segmentedFile.sharedCopy(), indexSummary.sharedCopy(),
+ new AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL);
+ reader.first = reader.last = readerBounds(generation);
+ if (!keepRef)
+ reader.selfRef().release();
+ return reader;
+ }
+
+ public static ColumnFamilyStore newCFS()
+ {
+ String cfname = "mockcf" + (id.incrementAndGet());
+ CFMetaData metadata = newCFMetaData(cfname);
+ return new ColumnFamilyStore(ks, cfname, Murmur3Partitioner.instance, 0, metadata, new Directories(metadata), false, false);
+ }
+
+ private static CFMetaData newCFMetaData(String cfname)
+ {
+ CFMetaData metadata = new CFMetaData("mockks", cfname, ColumnFamilyType.Standard, new SimpleSparseCellNameType(UTF8Type.instance));
+ metadata.caching(CachingOptions.NONE);
+ return metadata;
+ }
+
+ public static BufferDecoratedKey readerBounds(int generation)
+ {
+ return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(generation), ByteBufferUtil.EMPTY_BYTE_BUFFER);
+ }
+
+ private static File temp(String id)
+ {
+ try
+ {
+ File file = File.createTempFile(id, "tmp");
+ file.deleteOnExit();
+ return file;
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 9b8e5df..c2205c4 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -24,16 +24,17 @@ import java.io.*;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.UUID;
+import java.nio.channels.FileChannel;
+import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.cache.CachingOptions;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.composites.*;
import org.apache.cassandra.db.compaction.AbstractCompactionTask;
@@ -44,17 +45,27 @@ import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.IndexSummary;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.sstable.format.big.BigTableReader;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.AlwaysPresentFilter;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CounterId;
+import org.apache.hadoop.fs.FileUtil;
import static org.junit.Assert.assertTrue;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index e5fd470..27e7a2b 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -43,7 +43,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.Refs;
import static org.junit.Assert.assertEquals;
@@ -171,7 +170,7 @@ public class KeyCacheTest
assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);
- Set<SSTableReader> readers = cfs.getDataTracker().getSSTables();
+ Set<SSTableReader> readers = cfs.getTracker().getSSTables();
Refs<SSTableReader> refs = Refs.tryRef(readers);
if (refs == null)
throw new IllegalStateException();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index a5af823..dbbce9e 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -20,57 +20,63 @@ package org.apache.cassandra.db;
*
*/
-import java.io.*;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ExecutionException;
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.utils.UUIDGen;
import org.apache.commons.lang3.StringUtils;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.Scrubber;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableRewriter;
-import org.apache.cassandra.io.sstable.format.SSTableFormat;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.Util;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.UUIDGen;
import static org.junit.Assert.*;
import static org.junit.Assume.assumeTrue;
import static org.apache.cassandra.Util.cellname;
import static org.apache.cassandra.Util.column;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
@RunWith(OrderedJUnit4ClassRunner.class)
public class ScrubTest
@@ -155,7 +161,8 @@ public class ScrubTest
overrideWithGarbage(sstable, ByteBufferUtil.bytes("0"), ByteBufferUtil.bytes("1"));
// with skipCorrupted == false, the scrub is expected to fail
- try(Scrubber scrubber = new Scrubber(cfs, sstable, false, false, true))
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(Arrays.asList(sstable), OperationType.SCRUB);
+ Scrubber scrubber = new Scrubber(cfs, txn, false, false, true);)
{
scrubber.scrub();
fail("Expected a CorruptSSTableException to be thrown");
@@ -164,7 +171,8 @@ public class ScrubTest
// with skipCorrupted == true, the corrupt rows will be skipped
Scrubber.ScrubResult scrubResult;
- try(Scrubber scrubber = new Scrubber(cfs, sstable, true, false, true))
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(Arrays.asList(sstable), OperationType.SCRUB);
+ Scrubber scrubber = new Scrubber(cfs, txn, true, false, true);)
{
scrubResult = scrubber.scrubWithResult();
}
@@ -213,20 +221,24 @@ public class ScrubTest
overrideWithGarbage(sstable, ByteBufferUtil.bytes("0"), ByteBufferUtil.bytes("1"));
// with skipCorrupted == false, the scrub is expected to fail
- Scrubber scrubber = new Scrubber(cfs, sstable, false, false, true);
- try
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(Arrays.asList(sstable), OperationType.SCRUB);
+ Scrubber scrubber = new Scrubber(cfs, txn, false, false, true))
{
+ // with skipCorrupted == true, the corrupt row will be skipped
scrubber.scrub();
fail("Expected a CorruptSSTableException to be thrown");
}
catch (IOError err) {}
- // with skipCorrupted == true, the corrupt row will be skipped
- scrubber = new Scrubber(cfs, sstable, true, false, true);
- scrubber.scrub();
- scrubber.close();
- assertEquals(1, cfs.getSSTables().size());
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(Arrays.asList(sstable), OperationType.SCRUB);
+ Scrubber scrubber = new Scrubber(cfs, txn, true, false, true))
+ {
+ // with skipCorrupted == true, the corrupt row will be skipped
+ scrubber.scrub();
+ scrubber.close();
+ }
+ assertEquals(1, cfs.getSSTables().size());
// verify that we can read all of the rows, and there is now one less row
rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
assertEquals(1, rows.size());
@@ -369,9 +381,13 @@ public class ScrubTest
components.add(Component.STATS);
components.add(Component.SUMMARY);
components.add(Component.TOC);
+
SSTableReader sstable = SSTableReader.openNoValidation(desc, components, cfs);
+ if (sstable.last.compareTo(sstable.first) < 0)
+ sstable.last = sstable.first;
- try(Scrubber scrubber = new Scrubber(cfs, sstable, false, true, true))
+ try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.SCRUB, sstable);
+ Scrubber scrubber = new Scrubber(cfs, txn, false, true, true);)
{
scrubber.scrub();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 1dc72ae..235462b 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -17,7 +17,7 @@
*/
package org.apache.cassandra.db.compaction;
-import junit.framework.Assert;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.utils.concurrent.Refs;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
@@ -30,7 +30,6 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
-import java.util.concurrent.ExecutionException;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -91,13 +90,18 @@ public class AntiCompactionTest
Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("4".getBytes()));
List<Range<Token>> ranges = Arrays.asList(range);
- Refs<SSTableReader> refs = Refs.ref(sstables);
- long repairedAt = 1000;
- CompactionManager.instance.performAnticompaction(store, ranges, refs, repairedAt);
-
- assertEquals(2, store.getSSTables().size());
int repairedKeys = 0;
int nonRepairedKeys = 0;
+ try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+ Refs<SSTableReader> refs = Refs.ref(sstables))
+ {
+ if (txn == null)
+ throw new IllegalStateException();
+ long repairedAt = 1000;
+ CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt);
+ }
+
+ assertEquals(2, store.getSSTables().size());
for (SSTableReader sstable : store.getSSTables())
{
try (ISSTableScanner scanner = sstable.getScanner())
@@ -123,7 +127,7 @@ public class AntiCompactionTest
assertFalse(sstable.isMarkedCompacted());
assertEquals(1, sstable.selfRef().globalCount());
}
- assertEquals(0, store.getDataTracker().getCompacting().size());
+ assertEquals(0, store.getTracker().getCompacting().size());
assertEquals(repairedKeys, 4);
assertEquals(nonRepairedKeys, 6);
}
@@ -139,13 +143,16 @@ public class AntiCompactionTest
long origSize = s.bytesOnDisk();
Range<Token> range = new Range<Token>(new BytesToken(ByteBufferUtil.bytes(0)), new BytesToken(ByteBufferUtil.bytes(500)));
Collection<SSTableReader> sstables = cfs.getSSTables();
- CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), Refs.tryRef(sstables), 12345);
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+ Refs<SSTableReader> refs = Refs.ref(sstables))
+ {
+ CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), refs, txn, 12345);
+ }
long sum = 0;
for (SSTableReader x : cfs.getSSTables())
sum += x.bytesOnDisk();
assertEquals(sum, cfs.metric.liveDiskSpaceUsed.getCount());
assertEquals(origSize, cfs.metric.liveDiskSpaceUsed.getCount(), 100000);
-
}
private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
@@ -210,10 +217,12 @@ public class AntiCompactionTest
Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("4".getBytes()));
List<Range<Token>> ranges = Arrays.asList(range);
- Refs<SSTableReader> refs = Refs.tryRef(sstables);
- Assert.assertNotNull(refs);
long repairedAt = 1000;
- CompactionManager.instance.performAnticompaction(store, ranges, refs, repairedAt);
+ try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+ Refs<SSTableReader> refs = Refs.ref(sstables))
+ {
+ CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, repairedAt);
+ }
/*
Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time
so there will be no net change in the number of sstables
@@ -256,12 +265,16 @@ public class AntiCompactionTest
Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("9999".getBytes()));
List<Range<Token>> ranges = Arrays.asList(range);
- CompactionManager.instance.performAnticompaction(store, ranges, Refs.tryRef(sstables), 1);
+ try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+ Refs<SSTableReader> refs = Refs.ref(sstables))
+ {
+ CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1);
+ }
assertThat(store.getSSTables().size(), is(1));
assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(true));
assertThat(Iterables.get(store.getSSTables(), 0).selfRef().globalCount(), is(1));
- assertThat(store.getDataTracker().getCompacting().size(), is(0));
+ assertThat(store.getTracker().getCompacting().size(), is(0));
}
@@ -282,8 +295,12 @@ public class AntiCompactionTest
Range<Token> range = new Range<Token>(new BytesToken("-10".getBytes()), new BytesToken("-1".getBytes()));
List<Range<Token>> ranges = Arrays.asList(range);
- Refs<SSTableReader> refs = Refs.ref(sstables);
- CompactionManager.instance.performAnticompaction(store, ranges, refs, 0);
+
+ try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+ Refs<SSTableReader> refs = Refs.ref(sstables))
+ {
+ CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 0);
+ }
assertThat(store.getSSTables().size(), is(10));
assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
index 88074af..235fd49 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db.compaction;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.concurrent.ExecutionException;
import com.google.common.primitives.Longs;
import org.junit.Before;
@@ -41,11 +40,12 @@ import org.apache.cassandra.db.compaction.writers.MajorLeveledCompactionWriter;
import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter;
import org.apache.cassandra.db.compaction.writers.SplittingSizeTieredCompactionWriter;
import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.locator.SimpleStrategy;
-import org.apache.cassandra.utils.ByteBufferUtil;
+
import static org.junit.Assert.assertEquals;
public class CompactionAwareWriterTest
@@ -81,10 +81,10 @@ public class CompactionAwareWriterTest
int rowCount = 1000;
cfs.disableAutoCompaction();
populate(cfs, rowCount);
- Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
- long beforeSize = sstables.iterator().next().onDiskLength();
- CompactionAwareWriter writer = new DefaultCompactionWriter(cfs, sstables, sstables, false, OperationType.COMPACTION);
- int rows = compact(cfs, sstables, writer);
+ LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getSSTables(), OperationType.COMPACTION);
+ long beforeSize = txn.originals().iterator().next().onDiskLength();
+ CompactionAwareWriter writer = new DefaultCompactionWriter(cfs, txn, txn.originals(), false, OperationType.COMPACTION);
+ int rows = compact(cfs, txn, writer);
assertEquals(1, cfs.getSSTables().size());
assertEquals(rowCount, rows);
assertEquals(beforeSize, cfs.getSSTables().iterator().next().onDiskLength());
@@ -100,11 +100,11 @@ public class CompactionAwareWriterTest
cfs.disableAutoCompaction();
int rowCount = 1000;
populate(cfs, rowCount);
- Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
- long beforeSize = sstables.iterator().next().onDiskLength();
+ LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getSSTables(), OperationType.COMPACTION);
+ long beforeSize = txn.originals().iterator().next().onDiskLength();
int sstableSize = (int)beforeSize/10;
- CompactionAwareWriter writer = new MaxSSTableSizeWriter(cfs, sstables, sstables, sstableSize, 0, false, OperationType.COMPACTION);
- int rows = compact(cfs, sstables, writer);
+ CompactionAwareWriter writer = new MaxSSTableSizeWriter(cfs, txn, txn.originals(), sstableSize, 0, false, OperationType.COMPACTION);
+ int rows = compact(cfs, txn, writer);
assertEquals(10, cfs.getSSTables().size());
assertEquals(rowCount, rows);
validateData(cfs, rowCount);
@@ -118,10 +118,10 @@ public class CompactionAwareWriterTest
cfs.disableAutoCompaction();
int rowCount = 10000;
populate(cfs, rowCount);
- Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
- long beforeSize = sstables.iterator().next().onDiskLength();
- CompactionAwareWriter writer = new SplittingSizeTieredCompactionWriter(cfs, sstables, sstables, OperationType.COMPACTION, 0);
- int rows = compact(cfs, sstables, writer);
+ LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getSSTables(), OperationType.COMPACTION);
+ long beforeSize = txn.originals().iterator().next().onDiskLength();
+ CompactionAwareWriter writer = new SplittingSizeTieredCompactionWriter(cfs, txn, txn.originals(), OperationType.COMPACTION, 0);
+ int rows = compact(cfs, txn, writer);
long expectedSize = beforeSize / 2;
List<SSTableReader> sortedSSTables = new ArrayList<>(cfs.getSSTables());
@@ -154,11 +154,11 @@ public class CompactionAwareWriterTest
int rowCount = 20000;
int targetSSTableCount = 50;
populate(cfs, rowCount);
- Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
- long beforeSize = sstables.iterator().next().onDiskLength();
+ LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getSSTables(), OperationType.COMPACTION);
+ long beforeSize = txn.originals().iterator().next().onDiskLength();
int sstableSize = (int)beforeSize/targetSSTableCount;
- CompactionAwareWriter writer = new MajorLeveledCompactionWriter(cfs, sstables, sstables, sstableSize, false, OperationType.COMPACTION);
- int rows = compact(cfs, sstables, writer);
+ CompactionAwareWriter writer = new MajorLeveledCompactionWriter(cfs, txn, txn.originals(), sstableSize, false, OperationType.COMPACTION);
+ int rows = compact(cfs, txn, writer);
assertEquals(targetSSTableCount, cfs.getSSTables().size());
int [] levelCounts = new int[5];
assertEquals(rowCount, rows);
@@ -175,13 +175,13 @@ public class CompactionAwareWriterTest
cfs.truncateBlocking();
}
- private int compact(ColumnFamilyStore cfs, Set<SSTableReader> sstables, CompactionAwareWriter writer)
+ private int compact(ColumnFamilyStore cfs, LifecycleTransaction txn, CompactionAwareWriter writer)
{
- assert sstables.size() == 1;
+ assert txn.originals().size() == 1;
int rowsWritten = 0;
- try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables))
+ try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(txn.originals()))
{
- CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
+ CompactionController controller = new CompactionController(cfs, txn.originals(), cfs.gcBefore(System.currentTimeMillis()));
ISSTableScanner scanner = scanners.scanners.get(0);
while(scanner.hasNext())
{
@@ -191,7 +191,6 @@ public class CompactionAwareWriterTest
}
}
Collection<SSTableReader> newSSTables = writer.finish();
- cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newSSTables, OperationType.COMPACTION);
return rowsWritten;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
index f1d016b..64e4465 100644
--- a/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/DateTieredCompactionStrategyTest.java
@@ -324,10 +324,10 @@ public class DateTieredCompactionStrategyTest extends SchemaLoader
Thread.sleep(2000);
AbstractCompactionTask t = dtcs.getNextBackgroundTask((int) (System.currentTimeMillis()/1000));
assertNotNull(t);
- assertEquals(1, Iterables.size(t.sstables));
- SSTableReader sstable = t.sstables.iterator().next();
+ assertEquals(1, Iterables.size(t.transaction.originals()));
+ SSTableReader sstable = t.transaction.originals().iterator().next();
assertEquals(sstable, expiredSSTable);
- cfs.getDataTracker().unmarkCompacting(cfs.getSSTables());
+ t.transaction.abort();
}
}