You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/05/22 10:46:36 UTC
[5/7] cassandra git commit: Extend Transactional API to sstable
lifecycle management
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index d79b835..004e893 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -54,14 +54,10 @@ import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.OnDiskAtom;
-import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.CompactionInfo.Holder;
import org.apache.cassandra.db.index.SecondaryIndexBuilder;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -82,12 +78,14 @@ import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.Refs;
+import static java.util.Collections.singleton;
+
/**
* <p>
* A singleton which manages a private executor of ongoing compactions.
* </p>
* Scheduling for compaction is accomplished by swapping sstables to be compacted into
- * a set via DataTracker. New scheduling attempts will ignore currently compacting
+ * a set via Tracker. New scheduling attempts will ignore currently compacting
* sstables.
*/
public class CompactionManager implements CompactionManagerMBean
@@ -195,7 +193,7 @@ public class CompactionManager implements CompactionManagerMBean
public boolean isCompacting(Iterable<ColumnFamilyStore> cfses)
{
for (ColumnFamilyStore cfs : cfses)
- if (!cfs.getDataTracker().getCompacting().isEmpty())
+ if (!cfs.getTracker().getCompacting().isEmpty())
return true;
return false;
}
@@ -245,22 +243,22 @@ public class CompactionManager implements CompactionManagerMBean
}
}
- private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation) throws ExecutionException, InterruptedException
+ private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation, OperationType operationType) throws ExecutionException, InterruptedException
{
- Iterable<SSTableReader> compactingSSTables = cfs.markAllCompacting();
- if (compactingSSTables == null)
- {
- logger.info("Aborting operation on {}.{} after failing to interrupt other compaction operations", cfs.keyspace.getName(), cfs.name);
- return AllSSTableOpStatus.ABORTED;
- }
- if (Iterables.isEmpty(compactingSSTables))
+ try (LifecycleTransaction compacting = cfs.markAllCompacting(operationType);)
{
- logger.info("No sstables for {}.{}", cfs.keyspace.getName(), cfs.name);
- return AllSSTableOpStatus.SUCCESSFUL;
- }
- try
- {
- Iterable<SSTableReader> sstables = operation.filterSSTables(compactingSSTables);
+ if (compacting == null)
+ {
+ logger.info("Aborting operation on {}.{} after failing to interrupt other compaction operations", cfs.keyspace.getName(), cfs.name);
+ return AllSSTableOpStatus.ABORTED;
+ }
+ if (compacting.originals().isEmpty())
+ {
+ logger.info("No sstables for {}.{}", cfs.keyspace.getName(), cfs.name);
+ return AllSSTableOpStatus.SUCCESSFUL;
+ }
+
+ Iterable<SSTableReader> sstables = operation.filterSSTables(compacting.originals());
List<Future<Object>> futures = new ArrayList<>();
for (final SSTableReader sstable : sstables)
@@ -271,31 +269,30 @@ public class CompactionManager implements CompactionManagerMBean
return AllSSTableOpStatus.ABORTED;
}
+ final LifecycleTransaction txn = compacting.split(singleton(sstable));
futures.add(executor.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
- operation.execute(sstable);
+ operation.execute(txn);
return this;
}
}));
}
+ assert compacting.originals().isEmpty();
+
for (Future<Object> f : futures)
f.get();
+ return AllSSTableOpStatus.SUCCESSFUL;
}
- finally
- {
- cfs.getDataTracker().unmarkCompacting(compactingSSTables);
- }
- return AllSSTableOpStatus.SUCCESSFUL;
}
private static interface OneSSTableOperation
{
Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input);
- void execute(SSTableReader input) throws IOException;
+ void execute(LifecycleTransaction input) throws IOException;
}
public enum AllSSTableOpStatus { ABORTED(1), SUCCESSFUL(0);
@@ -318,11 +315,11 @@ public class CompactionManager implements CompactionManagerMBean
}
@Override
- public void execute(SSTableReader input) throws IOException
+ public void execute(LifecycleTransaction input) throws IOException
{
scrubOne(cfs, input, skipCorrupted, checkData);
}
- });
+ }, OperationType.SCRUB);
}
public AllSSTableOpStatus performVerify(final ColumnFamilyStore cfs, final boolean extendedVerify) throws InterruptedException, ExecutionException
@@ -337,11 +334,11 @@ public class CompactionManager implements CompactionManagerMBean
}
@Override
- public void execute(SSTableReader input) throws IOException
+ public void execute(LifecycleTransaction input) throws IOException
{
- verifyOne(cfs, input, extendedVerify);
+ verifyOne(cfs, input.onlyOne(), extendedVerify);
}
- });
+ }, OperationType.VERIFY);
}
public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, final boolean excludeCurrentVersion) throws InterruptedException, ExecutionException
@@ -362,14 +359,14 @@ public class CompactionManager implements CompactionManagerMBean
}
@Override
- public void execute(SSTableReader input) throws IOException
+ public void execute(LifecycleTransaction txn) throws IOException
{
- AbstractCompactionTask task = cfs.getCompactionStrategy().getCompactionTask(Collections.singleton(input), NO_GC, Long.MAX_VALUE);
+ AbstractCompactionTask task = cfs.getCompactionStrategy().getCompactionTask(txn, NO_GC, Long.MAX_VALUE);
task.setUserDefined(true);
task.setCompactionType(OperationType.UPGRADE_SSTABLES);
task.execute(metrics);
}
- });
+ }, OperationType.UPGRADE_SSTABLES);
}
public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
@@ -395,12 +392,12 @@ public class CompactionManager implements CompactionManagerMBean
}
@Override
- public void execute(SSTableReader input) throws IOException
+ public void execute(LifecycleTransaction txn) throws IOException
{
CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, ranges);
- doCleanupOne(cfStore, input, cleanupStrategy, ranges, hasIndexes);
+ doCleanupOne(cfStore, txn, cleanupStrategy, ranges, hasIndexes);
}
- });
+ }, OperationType.CLEANUP);
}
public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs,
@@ -412,19 +409,19 @@ public class CompactionManager implements CompactionManagerMBean
@Override
public void runMayThrow() throws Exception
{
- boolean success = false;
- while (!success)
+ LifecycleTransaction modifier = null;
+ while (modifier == null)
{
- for (SSTableReader compactingSSTable : cfs.getDataTracker().getCompacting())
+ for (SSTableReader compactingSSTable : cfs.getTracker().getCompacting())
sstables.releaseIfHolds(compactingSSTable);
Set<SSTableReader> compactedSSTables = new HashSet<>();
for (SSTableReader sstable : sstables)
if (sstable.isMarkedCompacted())
compactedSSTables.add(sstable);
sstables.release(compactedSSTables);
- success = sstables.isEmpty() || cfs.getDataTracker().markCompacting(sstables);
+ modifier = cfs.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
}
- performAnticompaction(cfs, ranges, sstables, repairedAt);
+ performAnticompaction(cfs, ranges, sstables, modifier, repairedAt);
}
};
if (executor.isShutdown())
@@ -452,6 +449,7 @@ public class CompactionManager implements CompactionManagerMBean
public void performAnticompaction(ColumnFamilyStore cfs,
Collection<Range<Token>> ranges,
Refs<SSTableReader> validatedForRepair,
+ LifecycleTransaction txn,
long repairedAt) throws InterruptedException, IOException
{
logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getSSTables().size());
@@ -490,16 +488,18 @@ public class CompactionManager implements CompactionManagerMBean
}
}
}
- cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
- cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses));
+ cfs.getTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
+ txn.cancel(Sets.union(nonAnticompacting, mutatedRepairStatuses));
validatedForRepair.release(Sets.union(nonAnticompacting, mutatedRepairStatuses));
+ assert txn.originals().equals(sstables);
if (!sstables.isEmpty())
- doAntiCompaction(cfs, ranges, sstables, repairedAt);
+ doAntiCompaction(cfs, ranges, txn, repairedAt);
+ txn.finish();
}
finally
{
validatedForRepair.release();
- cfs.getDataTracker().unmarkCompacting(sstables);
+ txn.close();
}
logger.info("Completed anticompaction successfully");
@@ -657,9 +657,9 @@ public class CompactionManager implements CompactionManagerMBean
}
}
- private void scrubOne(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean checkData) throws IOException
+ private void scrubOne(ColumnFamilyStore cfs, LifecycleTransaction modifier, boolean skipCorrupted, boolean checkData) throws IOException
{
- Scrubber scrubber = new Scrubber(cfs, sstable, skipCorrupted, false, checkData);
+ Scrubber scrubber = new Scrubber(cfs, modifier, skipCorrupted, false, checkData);
CompactionInfo.Holder scrubInfo = scrubber.getScrubInfo();
metrics.beginCompaction(scrubInfo);
@@ -750,15 +750,16 @@ public class CompactionManager implements CompactionManagerMBean
*
* @throws IOException
*/
- private void doCleanupOne(final ColumnFamilyStore cfs, SSTableReader sstable, CleanupStrategy cleanupStrategy, Collection<Range<Token>> ranges, boolean hasIndexes) throws IOException
+ private void doCleanupOne(final ColumnFamilyStore cfs, LifecycleTransaction txn, CleanupStrategy cleanupStrategy, Collection<Range<Token>> ranges, boolean hasIndexes) throws IOException
{
assert !cfs.isIndex();
- Set<SSTableReader> sstableSet = Collections.singleton(sstable);
+ SSTableReader sstable = txn.onlyOne();
if (!hasIndexes && !new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges))
{
- cfs.getDataTracker().markCompactedSSTablesReplaced(sstableSet, Collections.<SSTableReader>emptyList(), OperationType.CLEANUP);
+ txn.obsoleteOriginals();
+ txn.finish();
return;
}
if (!needsCleanup(sstable, ranges))
@@ -772,13 +773,13 @@ public class CompactionManager implements CompactionManagerMBean
long totalkeysWritten = 0;
int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(),
- (int) (SSTableReader.getApproximateKeyCount(sstableSet)));
+ (int) (SSTableReader.getApproximateKeyCount(txn.originals())));
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : {}", expectedBloomFilterSize);
logger.info("Cleaning up {}", sstable);
- File compactionFileLocation = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableSet, OperationType.CLEANUP));
+ File compactionFileLocation = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(txn.originals(), OperationType.CLEANUP));
if (compactionFileLocation == null)
throw new IOException("disk full");
@@ -786,10 +787,9 @@ public class CompactionManager implements CompactionManagerMBean
CleanupInfo ci = new CleanupInfo(sstable, scanner);
metrics.beginCompaction(ci);
- Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable);
List<SSTableReader> finished;
- try (SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, false);
- CompactionController controller = new CompactionController(cfs, sstableSet, getDefaultGcBefore(cfs)))
+ try (SSTableRewriter writer = new SSTableRewriter(cfs, txn, sstable.maxDataAge, false);
+ CompactionController controller = new CompactionController(cfs, txn.originals(), getDefaultGcBefore(cfs)))
{
writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
@@ -811,7 +811,6 @@ public class CompactionManager implements CompactionManagerMBean
cfs.indexManager.flushIndexesBlocking();
finished = writer.finish();
- cfs.getDataTracker().markCompactedSSTablesReplaced(oldSSTable, finished, OperationType.CLEANUP);
}
finally
{
@@ -970,11 +969,11 @@ public class CompactionManager implements CompactionManagerMBean
}
}
return SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(compactionFileLocation)),
- (long)expectedBloomFilterSize,
- repairedAt,
- cfs.metadata,
- cfs.partitioner,
- new MetadataCollector(sstables, cfs.metadata.comparator, minLevel));
+ (long) expectedBloomFilterSize,
+ repairedAt,
+ cfs.metadata,
+ cfs.partitioner,
+ new MetadataCollector(sstables, cfs.metadata.comparator, minLevel));
}
@@ -1057,7 +1056,7 @@ public class CompactionManager implements CompactionManagerMBean
long numPartitions = 0;
for (SSTableReader sstable : sstables)
{
- numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range));
+ numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
}
// determine tree depth from number of partitions, but cap at 20 to prevent large tree.
int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
@@ -1119,37 +1118,39 @@ public class CompactionManager implements CompactionManagerMBean
* will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted
* and subsequently deleted.
* @param cfs
- * @param repairedSSTables
+ * @param repaired a transaction over the repaired sstables to anticompacy
* @param ranges Repaired ranges to be placed into one of the new sstables. The repaired table will be tracked via
* the {@link org.apache.cassandra.io.sstable.metadata.StatsMetadata#repairedAt} field.
*/
- private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges,
- Collection<SSTableReader> repairedSSTables, long repairedAt)
+ private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, LifecycleTransaction repaired, long repairedAt)
{
- logger.info("Performing anticompaction on {} sstables", repairedSSTables.size());
+ logger.info("Performing anticompaction on {} sstables", repaired.originals().size());
//Group SSTables
- Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(repairedSSTables);
+ Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(repaired.originals());
// iterate over sstables to check if the repaired / unrepaired ranges intersect them.
int antiCompactedSSTableCount = 0;
for (Collection<SSTableReader> sstableGroup : groupedSSTables)
{
- int antiCompacted = antiCompactGroup(cfs, ranges, sstableGroup, repairedAt);
- antiCompactedSSTableCount += antiCompacted;
+ try (LifecycleTransaction txn = repaired.split(sstableGroup))
+ {
+ int antiCompacted = antiCompactGroup(cfs, ranges, txn, repairedAt);
+ antiCompactedSSTableCount += antiCompacted;
+ }
}
String format = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";
- logger.info(format, repairedSSTables.size(), antiCompactedSSTableCount);
+ logger.info(format, repaired.originals().size(), antiCompactedSSTableCount);
}
private int antiCompactGroup(ColumnFamilyStore cfs, Collection<Range<Token>> ranges,
- Collection<SSTableReader> anticompactionGroup, long repairedAt)
+ LifecycleTransaction anticompactionGroup, long repairedAt)
{
long groupMaxDataAge = -1;
// check that compaction hasn't stolen any sstables used in previous repair sessions
// if we need to skip the anticompaction, it will be carried out by the next repair
- for (Iterator<SSTableReader> i = anticompactionGroup.iterator(); i.hasNext();)
+ for (Iterator<SSTableReader> i = anticompactionGroup.originals().iterator(); i.hasNext();)
{
SSTableReader sstable = i.next();
if (!new File(sstable.getFilename()).exists())
@@ -1162,26 +1163,25 @@ public class CompactionManager implements CompactionManagerMBean
groupMaxDataAge = sstable.maxDataAge;
}
-
- if (anticompactionGroup.size() == 0)
+ if (anticompactionGroup.originals().size() == 0)
{
logger.info("No valid anticompactions for this group, All sstables were compacted and are no longer available");
return 0;
}
logger.info("Anticompacting {}", anticompactionGroup);
- Set<SSTableReader> sstableAsSet = new HashSet<>(anticompactionGroup);
+ Set<SSTableReader> sstableAsSet = anticompactionGroup.originals();
File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
long repairedKeyCount = 0;
long unrepairedKeyCount = 0;
AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
- try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, false);
- SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, false);
- AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup);
+ try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false);
+ SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false);
+ AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup.originals());
CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs)))
{
- int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(anticompactionGroup)));
+ int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(sstableAsSet)));
repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet));
@@ -1212,12 +1212,18 @@ public class CompactionManager implements CompactionManagerMBean
{
metrics.finishCompaction(ci);
}
- // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
- // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
+
List<SSTableReader> anticompactedSSTables = new ArrayList<>();
- anticompactedSSTables.addAll(repairedSSTableWriter.setRepairedAt(repairedAt).finish());
- anticompactedSSTables.addAll(unRepairedSSTableWriter.setRepairedAt(ActiveRepairService.UNREPAIRED_SSTABLE).finish());
- cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION);
+ // since both writers are operating over the same Transaction, we cannot use the convenience Transactional.finish() method,
+ // as on the second finish() we would prepareToCommit() on a Transaction that has already been committed, which is forbidden by the API
+ // (since it indicates misuse). We call permitRedundantTransitions so that calls that transition to a state already occupied are permitted.
+ anticompactionGroup.permitRedundantTransitions();
+ repairedSSTableWriter.setRepairedAt(repairedAt).prepareToCommit();
+ unRepairedSSTableWriter.prepareToCommit();
+ anticompactedSSTables.addAll(repairedSSTableWriter.finished());
+ anticompactedSSTables.addAll(unRepairedSSTableWriter.finished());
+ repairedSSTableWriter.commit();
+ unRepairedSSTableWriter.commit();
logger.debug("Repaired {} keys out of {} for {}/{} in {}", repairedKeyCount,
repairedKeyCount + unrepairedKeyCount,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 34f57c1..e593ec0 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -17,8 +17,6 @@
*/
package org.apache.cassandra.db.compaction;
-import java.io.File;
-import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@@ -44,6 +42,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.UUIDGen;
@@ -55,9 +54,9 @@ public class CompactionTask extends AbstractCompactionTask
protected static long totalBytesCompacted = 0;
private CompactionExecutorStatsCollector collector;
- public CompactionTask(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables, int gcBefore, boolean offline)
+ public CompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int gcBefore, boolean offline)
{
- super(cfs, Sets.newHashSet(sstables));
+ super(cfs, txn);
this.gcBefore = gcBefore;
this.offline = offline;
}
@@ -71,23 +70,20 @@ public class CompactionTask extends AbstractCompactionTask
{
this.collector = collector;
run();
- return sstables.size();
+ return transaction.originals().size();
}
public boolean reduceScopeForLimitedSpace()
{
- if (partialCompactionsAcceptable() && sstables.size() > 1)
+ if (partialCompactionsAcceptable() && transaction.originals().size() > 1)
{
// Try again w/o the largest one.
- logger.warn("insufficient space to compact all requested files {}", StringUtils.join(sstables, ", "));
+ logger.warn("insufficient space to compact all requested files {}", StringUtils.join(transaction.originals(), ", "));
// Note that we have removed files that are still marked as compacting.
// This suboptimal but ok since the caller will unmark all the sstables at the end.
- SSTableReader removedSSTable = cfs.getMaxSizeFile(sstables);
- if (sstables.remove(removedSSTable))
- {
- cfs.getDataTracker().unmarkCompacting(Arrays.asList(removedSSTable));
- return true;
- }
+ SSTableReader removedSSTable = cfs.getMaxSizeFile(transaction.originals());
+ transaction.cancel(removedSSTable);
+ return true;
}
return false;
}
@@ -101,9 +97,9 @@ public class CompactionTask extends AbstractCompactionTask
{
// The collection of sstables passed may be empty (but not null); even if
// it is not empty, it may compact down to nothing if all rows are deleted.
- assert sstables != null;
+ assert transaction != null;
- if (sstables.size() == 0)
+ if (transaction.originals().isEmpty())
return;
// Note that the current compaction strategy, is not necessarily the one this task was created under.
@@ -115,12 +111,12 @@ public class CompactionTask extends AbstractCompactionTask
// note that we need to do a rough estimate early if we can fit the compaction on disk - this is pessimistic, but
// since we might remove sstables from the compaction in checkAvailableDiskSpace it needs to be done here
- long expectedWriteSize = cfs.getExpectedCompactedFileSize(sstables, compactionType);
+ long expectedWriteSize = cfs.getExpectedCompactedFileSize(transaction.originals(), compactionType);
long earlySSTableEstimate = Math.max(1, expectedWriteSize / strategy.getMaxSSTableBytes());
checkAvailableDiskSpace(earlySSTableEstimate, expectedWriteSize);
// sanity check: all sstables must belong to the same cfs
- assert !Iterables.any(sstables, new Predicate<SSTableReader>()
+ assert !Iterables.any(transaction.originals(), new Predicate<SSTableReader>()
{
@Override
public boolean apply(SSTableReader sstable)
@@ -129,13 +125,13 @@ public class CompactionTask extends AbstractCompactionTask
}
});
- UUID taskId = SystemKeyspace.startCompaction(cfs, sstables);
+ UUID taskId = SystemKeyspace.startCompaction(cfs, transaction.originals());
// new sstables from flush can be added during a compaction, but only the compaction can remove them,
// so in our single-threaded compaction world this is a valid way of determining if we're compacting
// all the sstables (that existed when we started)
StringBuilder ssTableLoggerMsg = new StringBuilder("[");
- for (SSTableReader sstr : sstables)
+ for (SSTableReader sstr : transaction.originals())
{
ssTableLoggerMsg.append(String.format("%s:level=%d, ", sstr.getFilename(), sstr.getSSTableLevel()));
}
@@ -148,11 +144,11 @@ public class CompactionTask extends AbstractCompactionTask
long totalKeysWritten = 0;
long estimatedKeys = 0;
- try (CompactionController controller = getCompactionController(sstables))
+ try (CompactionController controller = getCompactionController(transaction.originals()))
{
- Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
+ Set<SSTableReader> actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables());
- SSTableFormat.Type sstableFormat = getFormatType(sstables);
+ SSTableFormat.Type sstableFormat = getFormatType(transaction.originals());
List<SSTableReader> newSStables;
AbstractCompactionIterable ci;
@@ -171,7 +167,7 @@ public class CompactionTask extends AbstractCompactionTask
if (!controller.cfs.getCompactionStrategy().isActive)
throw new CompactionInterruptedException(ci.getCompactionInfo());
- try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, sstables, actuallyCompact))
+ try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact))
{
estimatedKeys = writer.estimatedKeys();
while (iter.hasNext())
@@ -205,13 +201,9 @@ public class CompactionTask extends AbstractCompactionTask
}
}
- Collection<SSTableReader> oldSStables = this.sstables;
- if (!offline)
- cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
-
// log a bunch of statistics about the result and save to system table compaction_history
long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- long startsize = SSTableReader.getTotalBytes(oldSStables);
+ long startsize = SSTableReader.getTotalBytes(transaction.originals());
long endsize = SSTableReader.getTotalBytes(newSStables);
double ratio = (double) endsize / (double) startsize;
@@ -223,7 +215,7 @@ public class CompactionTask extends AbstractCompactionTask
long totalSourceRows = 0;
String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), ci, startsize, endsize);
logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
- taskIdLoggerMsg, oldSStables.size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary));
+ taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary));
logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten));
@@ -236,9 +228,9 @@ public class CompactionTask extends AbstractCompactionTask
}
@Override
- public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables)
+ public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction transaction, Set<SSTableReader> nonExpiredSSTables)
{
- return new DefaultCompactionWriter(cfs, allSSTables, nonExpiredSSTables, offline, compactionType);
+ return new DefaultCompactionWriter(cfs, transaction, nonExpiredSSTables, offline, compactionType);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 6385671..18d5f7b 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.cql3.statements.CFPropDefs;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.utils.Pair;
@@ -67,8 +68,9 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
if (latestBucket.isEmpty())
return null;
- if (cfs.getDataTracker().markCompacting(latestBucket))
- return new CompactionTask(cfs, latestBucket, gcBefore, false);
+ LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
+ if (modifier != null)
+ return new CompactionTask(cfs, modifier, gcBefore, false);
}
}
@@ -366,11 +368,11 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
@Override
public synchronized Collection<AbstractCompactionTask> getMaximalTask(int gcBefore, boolean splitOutput)
{
- Iterable<SSTableReader> sstables = cfs.markAllCompacting();
- if (sstables == null)
+ LifecycleTransaction modifier = cfs.markAllCompacting(OperationType.COMPACTION);
+ if (modifier == null)
return null;
- return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, sstables, gcBefore, false));
+ return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, modifier, gcBefore, false));
}
@Override
@@ -378,13 +380,14 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
{
assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
- if (!cfs.getDataTracker().markCompacting(sstables))
+ LifecycleTransaction modifier = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+ if (modifier == null)
{
logger.debug("Unable to mark {} for compaction; probably a background compaction got to it first. You can disable background compactions temporarily if this is a problem", sstables);
return null;
}
- return new CompactionTask(cfs, sstables, gcBefore, false).setUserDefined(true);
+ return new CompactionTask(cfs, modifier, gcBefore, false).setUserDefined(true);
}
public int getEstimatedRemainingTasks()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 6b82ad3..c434d31 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -115,9 +116,10 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
op = OperationType.COMPACTION;
}
- if (cfs.getDataTracker().markCompacting(candidate.sstables))
+ LifecycleTransaction txn = cfs.getTracker().tryModify(candidate.sstables, OperationType.COMPACTION);
+ if (txn != null)
{
- LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, candidate.sstables, candidate.level, gcBefore, candidate.maxSSTableBytes, false);
+ LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, txn, candidate.level, gcBefore, candidate.maxSSTableBytes, false);
newTask.setCompactionType(op);
return newTask;
}
@@ -131,9 +133,10 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables);
if (Iterables.isEmpty(sstables))
return null;
- if (!cfs.getDataTracker().markCompacting(filteredSSTables))
+ LifecycleTransaction txn = cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION);
+ if (txn == null)
return null;
- return Arrays.<AbstractCompactionTask>asList(new LeveledCompactionTask(cfs, filteredSSTables, 0, gcBefore, getMaxSSTableBytes(), true));
+ return Arrays.<AbstractCompactionTask>asList(new LeveledCompactionTask(cfs, txn, 0, gcBefore, getMaxSSTableBytes(), true));
}
@@ -144,19 +147,19 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
}
@Override
- public AbstractCompactionTask getCompactionTask(Collection<SSTableReader> sstables, int gcBefore, long maxSSTableBytes)
+ public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
{
- assert sstables.size() > 0;
+ assert txn.originals().size() > 0;
int level = -1;
// if all sstables are in the same level, we can set that level:
- for (SSTableReader sstable : sstables)
+ for (SSTableReader sstable : txn.originals())
{
if (level == -1)
level = sstable.getSSTableLevel();
if (level != sstable.getSSTableLevel())
level = 0;
}
- return new LeveledCompactionTask(cfs, sstables, level, gcBefore, maxSSTableBytes, false);
+ return new LeveledCompactionTask(cfs, txn, level, gcBefore, maxSSTableBytes, false);
}
/**
@@ -226,7 +229,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
{
for (Integer level : byLevel.keySet())
{
- // level can be -1 when sstables are added to DataTracker but not to LeveledManifest
+ // level can be -1 when sstables are added to Tracker but not to LeveledManifest
// since we don't know which level those sstable belong yet, we simply do the same as L0 sstables.
if (level <= 0)
{
@@ -402,7 +405,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
if (sstables.isEmpty())
continue;
- Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting();
+ Set<SSTableReader> compacting = cfs.getTracker().getCompacting();
for (SSTableReader sstable : sstables)
{
if (sstable.getEstimatedDroppableTombstoneRatio(gcBefore) <= tombstoneThreshold)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
index ce9dfaf..1c3b686 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
import org.apache.cassandra.db.compaction.writers.MajorLeveledCompactionWriter;
import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
public class LeveledCompactionTask extends CompactionTask
{
@@ -31,20 +32,20 @@ public class LeveledCompactionTask extends CompactionTask
private final long maxSSTableBytes;
private final boolean majorCompaction;
- public LeveledCompactionTask(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables, int level, final int gcBefore, long maxSSTableBytes, boolean majorCompaction)
+ public LeveledCompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int level, int gcBefore, long maxSSTableBytes, boolean majorCompaction)
{
- super(cfs, sstables, gcBefore, false);
+ super(cfs, txn, gcBefore, false);
this.level = level;
this.maxSSTableBytes = maxSSTableBytes;
this.majorCompaction = majorCompaction;
}
@Override
- public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables)
+ public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
{
if (majorCompaction)
- return new MajorLeveledCompactionWriter(cfs, sstables, nonExpiredSSTables, maxSSTableBytes, false, compactionType);
- return new MaxSSTableSizeWriter(cfs, sstables, nonExpiredSSTables, maxSSTableBytes, getLevel(), false, compactionType);
+ return new MajorLeveledCompactionWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, false, compactionType);
+ return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, getLevel(), false, compactionType);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index daff131..0d0928f 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -23,7 +23,6 @@ import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
@@ -316,7 +315,7 @@ public class LeveledManifest
continue; // mostly this just avoids polluting the debug log with zero scores
// we want to calculate score excluding compacting ones
Set<SSTableReader> sstablesInLevel = Sets.newHashSet(sstables);
- Set<SSTableReader> remaining = Sets.difference(sstablesInLevel, cfs.getDataTracker().getCompacting());
+ Set<SSTableReader> remaining = Sets.difference(sstablesInLevel, cfs.getTracker().getCompacting());
double score = (double) SSTableReader.getTotalBytes(remaining) / (double)maxBytesForLevel(i, maxSSTableSizeInBytes);
logger.debug("Compaction score for level {} is {}", i, score);
@@ -361,7 +360,7 @@ public class LeveledManifest
private List<SSTableReader> getSSTablesForSTCS(Collection<SSTableReader> sstables)
{
- Iterable<SSTableReader> candidates = cfs.getDataTracker().getUncompactingSSTables(sstables);
+ Iterable<SSTableReader> candidates = cfs.getTracker().getUncompacting(sstables);
List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(AbstractCompactionStrategy.filterSuspectSSTables(candidates));
List<List<SSTableReader>> buckets = SizeTieredCompactionStrategy.getBuckets(pairs,
options.bucketHigh,
@@ -415,7 +414,7 @@ public class LeveledManifest
}
if (min == null || max == null || min.equals(max)) // single partition sstables - we cannot include a high level sstable.
return candidates;
- Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting();
+ Set<SSTableReader> compacting = cfs.getTracker().getCompacting();
Range<RowPosition> boundaries = new Range<>(min, max);
for (SSTableReader sstable : getLevel(i))
{
@@ -542,7 +541,7 @@ public class LeveledManifest
assert !getLevel(level).isEmpty();
logger.debug("Choosing candidates for L{}", level);
- final Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting();
+ final Set<SSTableReader> compacting = cfs.getTracker().getCompacting();
if (level == 0)
{
@@ -650,7 +649,7 @@ public class LeveledManifest
{
Set<SSTableReader> sstables = new HashSet<>();
Set<SSTableReader> levelSSTables = new HashSet<>(getLevel(level));
- for (SSTableReader sstable : cfs.getDataTracker().getCompacting())
+ for (SSTableReader sstable : cfs.getTracker().getCompacting())
{
if (levelSSTables.contains(sstable))
sstables.add(sstable);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
index 8d7b0e9..e9a4f05 100644
--- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@ -23,6 +23,7 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
public class SSTableSplitter {
@@ -30,9 +31,9 @@ public class SSTableSplitter {
private CompactionInfo.Holder info;
- public SSTableSplitter(ColumnFamilyStore cfs, SSTableReader sstable, int sstableSizeInMB)
+ public SSTableSplitter(ColumnFamilyStore cfs, LifecycleTransaction transaction, int sstableSizeInMB)
{
- this.task = new SplittingCompactionTask(cfs, sstable, sstableSizeInMB);
+ this.task = new SplittingCompactionTask(cfs, transaction, sstableSizeInMB);
}
public void split()
@@ -57,9 +58,9 @@ public class SSTableSplitter {
{
private final int sstableSizeInMB;
- public SplittingCompactionTask(ColumnFamilyStore cfs, SSTableReader sstable, int sstableSizeInMB)
+ public SplittingCompactionTask(ColumnFamilyStore cfs, LifecycleTransaction transaction, int sstableSizeInMB)
{
- super(cfs, Collections.singletonList(sstable), CompactionManager.NO_GC, true);
+ super(cfs, transaction, CompactionManager.NO_GC, true);
this.sstableSizeInMB = sstableSizeInMB;
if (sstableSizeInMB <= 0)
@@ -73,9 +74,9 @@ public class SSTableSplitter {
}
@Override
- public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables)
+ public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
{
- return new MaxSSTableSizeWriter(cfs, sstables, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true, compactionType);
+ return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true, compactionType);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 1e014ed..b7c149c 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -23,9 +23,9 @@ import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
-import com.google.common.collect.Sets;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
@@ -41,6 +41,7 @@ public class Scrubber implements Closeable
{
private final ColumnFamilyStore cfs;
private final SSTableReader sstable;
+ private final LifecycleTransaction transaction;
private final File destination;
private final boolean skipCorrupted;
@@ -80,15 +81,16 @@ public class Scrubber implements Closeable
};
private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator);
- public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean isOffline, boolean checkData) throws IOException
+ public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, boolean isOffline, boolean checkData) throws IOException
{
- this(cfs, sstable, skipCorrupted, new OutputHandler.LogOutput(), isOffline, checkData);
+ this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), isOffline, checkData);
}
- public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, OutputHandler outputHandler, boolean isOffline, boolean checkData) throws IOException
+ public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, OutputHandler outputHandler, boolean isOffline, boolean checkData) throws IOException
{
this.cfs = cfs;
- this.sstable = sstable;
+ this.transaction = transaction;
+ this.sstable = transaction.onlyOne();
this.outputHandler = outputHandler;
this.skipCorrupted = skipCorrupted;
this.isOffline = isOffline;
@@ -127,9 +129,7 @@ public class Scrubber implements Closeable
public void scrub()
{
outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
- Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable);
-
- try (SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, isOffline);)
+ try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, sstable.maxDataAge, isOffline);)
{
nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
{
@@ -278,8 +278,7 @@ public class Scrubber implements Closeable
inOrderWriter.append(row.key, row.cf);
newInOrderSstable = inOrderWriter.finish(-1, sstable.maxDataAge, true);
}
- if (!isOffline)
- cfs.getDataTracker().addSSTables(Collections.singleton(newInOrderSstable));
+ transaction.update(newInOrderSstable, false);
outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable));
}
@@ -287,8 +286,6 @@ public class Scrubber implements Closeable
List<SSTableReader> finished = writer.setRepairedAt(badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt).finish();
if (!finished.isEmpty())
newSstable = finished.get(0);
- if (!isOffline)
- cfs.getDataTracker().markCompactedSSTablesReplaced(oldSSTable, finished, OperationType.SCRUB);
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 722536c..94c3daf 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -21,22 +21,19 @@ import java.util.*;
import java.util.Map.Entry;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
import org.apache.cassandra.db.compaction.writers.SplittingSizeTieredCompactionWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import com.google.common.primitives.Longs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.cql3.statements.CFPropDefs;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.ColumnNameHelper;
import org.apache.cassandra.utils.Pair;
public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
@@ -190,8 +187,9 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
if (hottestBucket.isEmpty())
return null;
- if (cfs.getDataTracker().markCompacting(hottestBucket))
- return new CompactionTask(cfs, hottestBucket, gcBefore, false);
+ LifecycleTransaction transaction = cfs.getTracker().tryModify(hottestBucket, OperationType.COMPACTION);
+ if (transaction != null)
+ return new CompactionTask(cfs, transaction, gcBefore, false);
}
}
@@ -200,24 +198,26 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables);
if (Iterables.isEmpty(filteredSSTables))
return null;
- if (!cfs.getDataTracker().markCompacting(ImmutableList.copyOf(filteredSSTables)))
+ LifecycleTransaction txn = cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION);
+ if (txn == null)
return null;
if (splitOutput)
- return Arrays.<AbstractCompactionTask>asList(new SplittingCompactionTask(cfs, filteredSSTables, gcBefore, false));
- return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, filteredSSTables, gcBefore, false));
+ return Arrays.<AbstractCompactionTask>asList(new SplittingCompactionTask(cfs, txn, gcBefore, false));
+ return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, txn, gcBefore, false));
}
public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore)
{
assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
- if (!cfs.getDataTracker().markCompacting(sstables))
+ LifecycleTransaction transaction = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
+ if (transaction == null)
{
logger.debug("Unable to mark {} for compaction; probably a background compaction got to it first. You can disable background compactions temporarily if this is a problem", sstables);
return null;
}
- return new CompactionTask(cfs, sstables, gcBefore, false).setUserDefined(true);
+ return new CompactionTask(cfs, transaction, gcBefore, false).setUserDefined(true);
}
public int getEstimatedRemainingTasks()
@@ -338,15 +338,15 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
private static class SplittingCompactionTask extends CompactionTask
{
- public SplittingCompactionTask(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables, int gcBefore, boolean offline)
+ public SplittingCompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int gcBefore, boolean offline)
{
- super(cfs, sstables, gcBefore, offline);
+ super(cfs, txn, gcBefore, offline);
}
@Override
- public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables)
+ public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
{
- return new SplittingSizeTieredCompactionWriter(cfs, allSSTables, nonExpiredSSTables, compactionType);
+ return new SplittingSizeTieredCompactionWriter(cfs, txn, nonExpiredSSTables, compactionType);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 5bb1530..6556a71 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -20,12 +20,10 @@ package org.apache.cassandra.db.compaction;
import java.io.File;
import java.util.*;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Sets;
-
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
@@ -37,6 +35,7 @@ public class Upgrader
{
private final ColumnFamilyStore cfs;
private final SSTableReader sstable;
+ private final LifecycleTransaction transaction;
private final File directory;
private final OperationType compactionType = OperationType.UPGRADE_SSTABLES;
@@ -46,10 +45,11 @@ public class Upgrader
private final OutputHandler outputHandler;
- public Upgrader(ColumnFamilyStore cfs, SSTableReader sstable, OutputHandler outputHandler)
+ public Upgrader(ColumnFamilyStore cfs, LifecycleTransaction txn, OutputHandler outputHandler)
{
this.cfs = cfs;
- this.sstable = sstable;
+ this.transaction = txn;
+ this.sstable = txn.onlyOne();
this.outputHandler = outputHandler;
this.directory = new File(sstable.getFilename()).getParentFile();
@@ -81,10 +81,9 @@ public class Upgrader
public void upgrade()
{
outputHandler.output("Upgrading " + sstable);
- Set<SSTableReader> toUpgrade = Sets.newHashSet(sstable);
- try (SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(toUpgrade), true);
- AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(toUpgrade))
+ try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, CompactionTask.getMaxDataAge(transaction.originals()), true);
+ AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(transaction.originals()))
{
Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID()).iterator();
writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
index 5345d8d..c511bcd 100644
--- a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
@@ -50,7 +50,7 @@ public final class WrappingCompactionStrategy extends AbstractCompactionStrategy
{
super(cfs, cfs.metadata.compactionStrategyOptions);
reloadCompactionStrategy(cfs.metadata);
- cfs.getDataTracker().subscribe(this);
+ cfs.getTracker().subscribe(this);
logger.debug("{} subscribed to the data tracker.", this);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index fe43186..20c96d6 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
import org.apache.cassandra.db.compaction.CompactionTask;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.sstable.SSTableRewriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.concurrent.Transactional;
@@ -43,14 +44,14 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
protected final long minRepairedAt;
protected final SSTableRewriter sstableWriter;
- public CompactionAwareWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, boolean offline)
+ public CompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline)
{
this.cfs = cfs;
this.nonExpiredSSTables = nonExpiredSSTables;
this.estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
this.maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
this.minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
- this.sstableWriter = new SSTableRewriter(cfs, allSSTables, maxAge, offline);
+ this.sstableWriter = new SSTableRewriter(cfs, txn, maxAge, offline);
}
/**
@@ -67,12 +68,6 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
}
@Override
- protected Throwable doCleanup(Throwable accumulate)
- {
- return accumulate;
- }
-
- @Override
protected Throwable doCommit(Throwable accumulate)
{
return sstableWriter.commit(accumulate);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index 3589b54..0b31061 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db.compaction.writers;
import java.io.File;
-import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
@@ -28,14 +27,12 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableRewriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import static org.apache.cassandra.utils.Throwables.maybeFail;
-
/**
* The default compaction writer - creates one output file in L0
@@ -44,9 +41,9 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
{
protected static final Logger logger = LoggerFactory.getLogger(DefaultCompactionWriter.class);
- public DefaultCompactionWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, boolean offline, OperationType compactionType)
+ public DefaultCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, OperationType compactionType)
{
- super(cfs, allSSTables, nonExpiredSSTables, offline);
+ super(cfs, txn, nonExpiredSSTables, offline);
logger.debug("Expected bloom filter size : {}", estimatedTotalKeys);
long expectedWriteSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);
File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
@@ -55,7 +52,7 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
minRepairedAt,
cfs.metadata,
cfs.partitioner,
- new MetadataCollector(allSSTables, cfs.metadata.comparator, 0));
+ new MetadataCollector(txn.originals(), cfs.metadata.comparator, 0));
sstableWriter.switchWriter(writer);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index d48140e..014b4af 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -18,8 +18,6 @@
package org.apache.cassandra.db.compaction.writers;
import java.io.File;
-import java.util.Collections;
-import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
@@ -28,11 +26,10 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.db.compaction.CompactionTask;
import org.apache.cassandra.db.compaction.LeveledManifest;
import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableRewriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -50,11 +47,11 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
private int sstablesWritten = 0;
private final boolean skipAncestors;
- public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, boolean offline, OperationType compactionType)
+ public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, boolean offline, OperationType compactionType)
{
- super(cfs, allSSTables, nonExpiredSSTables, offline);
+ super(cfs, txn, nonExpiredSSTables, offline);
this.maxSSTableSize = maxSSTableSize;
- this.allSSTables = allSSTables;
+ this.allSSTables = txn.originals();
expectedWriteSize = Math.min(maxSSTableSize, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType));
long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(nonExpiredSSTables) / maxSSTableSize);
long keysPerSSTable = estimatedTotalKeys / estimatedSSTables;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index ab24bf8..8903ff7 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -18,16 +18,14 @@
package org.apache.cassandra.db.compaction.writers;
import java.io.File;
-import java.util.List;
import java.util.Set;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.db.compaction.CompactionTask;
import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableRewriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -41,10 +39,10 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
private final long estimatedSSTables;
private final Set<SSTableReader> allSSTables;
- public MaxSSTableSizeWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, int level, boolean offline, OperationType compactionType)
+ public MaxSSTableSizeWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, int level, boolean offline, OperationType compactionType)
{
- super(cfs, allSSTables, nonExpiredSSTables, offline);
- this.allSSTables = allSSTables;
+ super(cfs, txn, nonExpiredSSTables, offline);
+ this.allSSTables = txn.originals();
this.level = level;
this.maxSSTableSize = maxSSTableSize;
long totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index 2a452c7..81ea6b1 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db.compaction.writers;
import java.io.File;
import java.util.Arrays;
-import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
@@ -28,10 +27,9 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.db.compaction.CompactionTask;
import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableRewriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -53,15 +51,15 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
private long currentBytesToWrite;
private int currentRatioIndex = 0;
- public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, OperationType compactionType)
+ public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, OperationType compactionType)
{
- this(cfs, allSSTables, nonExpiredSSTables, compactionType, DEFAULT_SMALLEST_SSTABLE_BYTES);
+ this(cfs, txn, nonExpiredSSTables, compactionType, DEFAULT_SMALLEST_SSTABLE_BYTES);
}
- public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, OperationType compactionType, long smallestSSTable)
+ public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, OperationType compactionType, long smallestSSTable)
{
- super(cfs, allSSTables, nonExpiredSSTables, false);
- this.allSSTables = allSSTables;
+ super(cfs, txn, nonExpiredSSTables, false);
+ this.allSSTables = txn.originals();
totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);
double[] potentialRatios = new double[20];
double currentRatio = 1;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index f8b3aba..ba48350 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -57,7 +57,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
indexedCfMetadata.cfName,
new LocalPartitioner(getIndexKeyComparator()),
indexedCfMetadata,
- baseCfs.getDataTracker().loadsstables);
+ baseCfs.getTracker().loadsstables);
}
protected AbstractType<?> getIndexKeyComparator()
@@ -143,7 +143,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
{
Future<?> wait;
// we synchronise on the baseCfs to make sure we are ordered correctly with other flushes to the base CFS
- synchronized (baseCfs.getDataTracker())
+ synchronized (baseCfs.getTracker())
{
wait = indexCfs.forceFlush();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index dda532d..4c1bf45 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -346,7 +346,7 @@ public class SecondaryIndexManager
{
// despatch flushes for all CFS backed indexes
List<Future<?>> wait = new ArrayList<>();
- synchronized (baseCfs.getDataTracker())
+ synchronized (baseCfs.getTracker())
{
for (SecondaryIndex index : allIndexes)
if (index.getIndexCfs() != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
new file mode 100644
index 0000000..05f7531
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.lifecycle;
+
+import java.util.*;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.*;
+
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
+import static com.google.common.base.Predicates.*;
+import static com.google.common.collect.Iterables.any;
+import static com.google.common.collect.Iterables.concat;
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.getFirst;
+import static org.apache.cassandra.utils.Throwables.merge;
+
+class Helpers
+{
+ /**
+ * update the contents of a set with the provided sets, ensuring that the items to remove are
+ * really present, and that the items to add are not (unless we're also removing them)
+ * @return a new set with the contents of the provided one modified
+ */
+ static <T> Set<T> replace(Set<T> original, Set<T> remove, Iterable<T> add)
+ {
+ return ImmutableSet.copyOf(replace(identityMap(original), remove, add).keySet());
+ }
+
+ /**
+ * update the contents of an "identity map" with the provided sets, ensuring that the items to remove are
+ * really present, and that the items to add are not (unless we're also removing them)
+ * @return a new identity map with the contents of the provided one modified
+ */
+ static <T> Map<T, T> replace(Map<T, T> original, Set<T> remove, Iterable<T> add)
+ {
+ // ensure the ones being removed are the exact same ones present
+ for (T reader : remove)
+ assert original.get(reader) == reader;
+
+ // ensure we don't already contain any we're adding, that we aren't also removing
+ assert !any(add, and(not(in(remove)), in(original.keySet()))) : String.format("original:%s remove:%s add:%s", original.keySet(), remove, add);
+
+ Map<T, T> result =
+ identityMap(concat(add, filter(original.keySet(), not(in(remove)))));
+
+ assert result.size() == original.size() - remove.size() + Iterables.size(add) :
+ String.format("Expecting new size of %d, got %d while replacing %s by %s in %s",
+ original.size() - remove.size() + Iterables.size(add), result.size(), remove, add, original.keySet());
+ return result;
+ }
+
+ /**
+ * A convenience method for encapsulating this action over multiple SSTableReader with exception-safety
+ * @return accumulate if not null (with any thrown exception attached), or any thrown exception otherwise
+ */
+ static Throwable setupDeleteNotification(Iterable<SSTableReader> readers, Tracker tracker, Throwable accumulate)
+ {
+ try
+ {
+ for (SSTableReader reader : readers)
+ reader.setupDeleteNotification(tracker);
+ }
+ catch (Throwable t)
+ {
+ // shouldn't be possible, but in case the contract changes in future and we miss it...
+ accumulate = merge(accumulate, t);
+ }
+ return accumulate;
+ }
+
+ /**
+ * A convenience method for encapsulating this action over multiple SSTableReader with exception-safety
+ * @return accumulate if not null (with any thrown exception attached), or any thrown exception otherwise
+ */
+ static Throwable setReplaced(Iterable<SSTableReader> readers, Throwable accumulate)
+ {
+ for (SSTableReader reader : readers)
+ {
+ try
+ {
+ reader.setReplaced();
+ }
+ catch (Throwable t)
+ {
+ accumulate = merge(accumulate, t);
+ }
+ }
+ return accumulate;
+ }
+
+ /**
+ * assert that none of these readers have been replaced
+ */
+ static void checkNotReplaced(Iterable<SSTableReader> readers)
+ {
+ for (SSTableReader reader : readers)
+ assert !reader.isReplaced();
+ }
+
+ /**
+ * A convenience method for encapsulating this action over multiple SSTableReader with exception-safety
+ * @return accumulate if not null (with any thrown exception attached), or any thrown exception otherwise
+ */
+ static Throwable markObsolete(Iterable<SSTableReader> readers, Throwable accumulate)
+ {
+ for (SSTableReader reader : readers)
+ {
+ try
+ {
+ boolean firstToCompact = reader.markObsolete();
+ assert firstToCompact : reader + " was already marked compacted";
+ }
+ catch (Throwable t)
+ {
+ accumulate = merge(accumulate, t);
+ }
+ }
+ return accumulate;
+ }
+
+ /**
+ * @return the identity function, as a Map, with domain of the provided values
+ */
+ static <T> Map<T, T> identityMap(Iterable<T> values)
+ {
+ ImmutableMap.Builder<T, T> builder = ImmutableMap.<T, T>builder();
+ for (T t : values)
+ builder.put(t, t);
+ return builder.build();
+ }
+
+ /**
+ * @return an Iterable of the union if the sets, with duplicates being represented by their first encountered instance
+ * (as defined by the order of set provision)
+ */
+ static <T> Iterable<T> concatUniq(Set<T>... sets)
+ {
+ List<Predicate<T>> notIn = new ArrayList<>(sets.length);
+ for (Set<T> set : sets)
+ notIn.add(not(in(set)));
+ List<Iterable<T>> results = new ArrayList<>(sets.length);
+ for (int i = 0 ; i < sets.length ; i++)
+ results.add(filter(sets[i], and(notIn.subList(0, i))));
+ return concat(results);
+ }
+
+ /**
+ * @return a Predicate yielding true for an item present in NONE of the provided sets
+ */
+ static <T> Predicate<T> notIn(Set<T>... sets)
+ {
+ return not(orIn(sets));
+ }
+
+ /**
+ * @return a Predicate yielding true for an item present in ANY of the provided sets
+ */
+ static <T> Predicate<T> orIn(Collection<T>... sets)
+ {
+ Predicate<T>[] orIn = new Predicate[sets.length];
+ for (int i = 0 ; i < orIn.length ; i++)
+ orIn[i] = in(sets[i]);
+ return or(orIn);
+ }
+
+ /**
+ * filter out (i.e. remove) matching elements
+ * @return filter, filtered to only those elements that *are not* present in *any* of the provided sets (are present in none)
+ */
+ static <T> Iterable<T> filterOut(Iterable<T> filter, Set<T>... inNone)
+ {
+ return filter(filter, notIn(inNone));
+ }
+
+ /**
+ * filter in (i.e. retain)
+ *
+ * @return filter, filtered to only those elements that *are* present in *any* of the provided sets
+ */
+ static <T> Iterable<T> filterIn(Iterable<T> filter, Set<T>... inAny)
+ {
+ return filter(filter, orIn(inAny));
+ }
+
+ static Set<SSTableReader> emptySet()
+ {
+ return Collections.emptySet();
+ }
+
+ static <T> T select(T t, Collection<T> col)
+ {
+ if (col instanceof Set && !col.contains(t))
+ return null;
+ return getFirst(filter(col, equalTo(t)), null);
+ }
+
+ static <T> T selectFirst(T t, Collection<T> ... sets)
+ {
+ for (Collection<T> set : sets)
+ {
+ T select = select(t, set);
+ if (select != null)
+ return select;
+ }
+ return null;
+ }
+
+ static <T> Predicate<T> idIn(Set<T> set)
+ {
+ return idIn(identityMap(set));
+ }
+
+ static <T> Predicate<T> idIn(final Map<T, T> identityMap)
+ {
+ return new Predicate<T>()
+ {
+ public boolean apply(T t)
+ {
+ return identityMap.get(t) == t;
+ }
+ };
+ }
+
+}