You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/06/17 17:00:21 UTC
svn commit: r1136904 - in /cassandra/trunk/src/java/org/apache/cassandra/db:
DataTracker.java compaction/AbstractCompactionTask.java
compaction/CompactionManager.java compaction/CompactionTask.java
Author: slebresne
Date: Fri Jun 17 15:00:21 2011
New Revision: 1136904
URL: http://svn.apache.org/viewvc?rev=1136904&view=rev
Log:
Fix compaction of the same sstable by multiple thread
patch by slebresne; reviewed by jbellis for CASSANDRA-2769
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1136904&r1=1136903&r2=1136904&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Fri Jun 17 15:00:21 2011
@@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.AutoSavingCache;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.compaction.AbstractCompactionTask;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.utils.Pair;
@@ -163,6 +162,9 @@ public class DataTracker
{
if (max < min || max < 1)
return null;
+ if (tomark == null || tomark.isEmpty())
+ return null;
+
View currentView, newView;
Set<SSTableReader> subset = null;
// order preserving set copy of the input
@@ -190,41 +192,6 @@ public class DataTracker
return subset;
}
- public boolean markCompacting(AbstractCompactionTask task)
- {
- ColumnFamilyStore cfs = task.getColumnFamilyStore();
- return markCompacting(task, cfs.getMinimumCompactionThreshold(), cfs.getMaximumCompactionThreshold());
- }
-
- public boolean markCompacting(AbstractCompactionTask task, int min, int max)
- {
- Collection<SSTableReader> sstablesToMark = task.getSSTables();
- if (sstablesToMark == null || sstablesToMark.isEmpty())
- return false;
-
- if (max < min || max < 1)
- return false;
-
- View currentView, newView;
- // order preserving set copy of the input
- Set<SSTableReader> remaining = new LinkedHashSet<SSTableReader>(sstablesToMark);
- do
- {
- currentView = view.get();
-
- // find the subset that is active and not already compacting
- remaining.removeAll(currentView.compacting);
- remaining.retainAll(currentView.sstables);
- if (remaining.size() < min || remaining.size() > max)
- // cannot meet the min and max threshold
- return false;
-
- newView = currentView.markCompacting(sstablesToMark);
- }
- while (!view.compareAndSet(currentView, newView));
- return true;
- }
-
/**
* Removes files from compacting status: this is different from 'markCompacted'
* because it should be run regardless of whether a compaction succeeded.
@@ -240,11 +207,6 @@ public class DataTracker
while (!view.compareAndSet(currentView, newView));
}
- public void unmarkCompacting(AbstractCompactionTask task)
- {
- unmarkCompacting(task.getSSTables());
- }
-
public void markCompacted(Collection<SSTableReader> sstables)
{
replace(sstables, Collections.<SSTableReader>emptyList());
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java?rev=1136904&r1=1136903&r2=1136904&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java Fri Jun 17 15:00:21 2011
@@ -19,6 +19,7 @@
package org.apache.cassandra.db.compaction;
import java.util.Collection;
+import java.util.Set;
import java.io.IOException;
import org.apache.cassandra.io.sstable.SSTableReader;
@@ -47,4 +48,33 @@ public abstract class AbstractCompaction
{
return sstables;
}
+
+ /**
+ * Try to mark the sstable to compact as compacting.
+ * It returns true if some sstables have been marked for compaction, false
+ * otherwise.
+ * This *must* be called before calling execute(). Moreover,
+ * unmarkSSTables *must* always be called after execute() if this
+ * method returns true.
+ */
+ public boolean markSSTablesForCompaction()
+ {
+ return markSSTablesForCompaction(cfs.getMinimumCompactionThreshold(), cfs.getMaximumCompactionThreshold());
+ }
+
+ public boolean markSSTablesForCompaction(int min, int max)
+ {
+ Set<SSTableReader> marked = cfs.getDataTracker().markCompacting(sstables, min, max);
+
+ if (marked == null || marked.isEmpty())
+ return false;
+
+ this.sstables = marked;
+ return true;
+ }
+
+ public void unmarkSSTables()
+ {
+ cfs.getDataTracker().unmarkCompacting(sstables);
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1136904&r1=1136903&r2=1136904&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Fri Jun 17 15:00:21 2011
@@ -110,7 +110,7 @@ public class CompactionManager implement
AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
for (AbstractCompactionTask task : strategy.getBackgroundTasks(getDefaultGcBefore(cfs)))
{
- if (!cfs.getDataTracker().markCompacting(task))
+ if (!task.markSSTablesForCompaction())
continue;
try
@@ -119,7 +119,7 @@ public class CompactionManager implement
}
finally
{
- cfs.getDataTracker().unmarkCompacting(task);
+ task.unmarkSSTables();
}
}
}
@@ -246,7 +246,7 @@ public class CompactionManager implement
AbstractCompactionStrategy strategy = cfStore.getCompactionStrategy();
for (AbstractCompactionTask task : strategy.getMaximalTasks(gcBefore))
{
- if (!cfStore.getDataTracker().markCompacting(task, 0, Integer.MAX_VALUE))
+ if (!task.markSSTablesForCompaction(0, Integer.MAX_VALUE))
return this;
try
{
@@ -264,7 +264,7 @@ public class CompactionManager implement
}
finally
{
- cfStore.getDataTracker().unmarkCompacting(task);
+ task.unmarkSSTables();
}
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1136904&r1=1136903&r2=1136904&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java Fri Jun 17 15:00:21 2011
@@ -60,12 +60,18 @@ public class CompactionTask extends Abst
/**
* For internal use and testing only. The rest of the system should go through the submit* methods,
* which are properly serialized.
+ * Caller is in charge of marking/unmarking the sstables as compacting.
*/
public int execute(CompactionExecutorStatsCollector collector) throws IOException
{
+ // The collection of sstables passed may be empty (but not null); even if
+ // it is not empty, it may compact down to nothing if all rows are deleted.
+ assert sstables != null;
+
+ Set<SSTableReader> toCompact = new HashSet<SSTableReader>(sstables);
if (!isUserDefined)
{
- if (sstables.size() < 2)
+ if (toCompact.size() < 2)
{
logger.info("Nothing to compact in " + cfs.getColumnFamilyName() + "." +
"Use forceUserDefinedCompaction if you wish to force compaction of single sstables " +
@@ -74,18 +80,19 @@ public class CompactionTask extends Abst
}
if (compactionFileLocation == null)
- compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(sstables));
+ compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact));
// If the compaction file path is null that means we have no space left for this compaction.
// Try again w/o the largest one.
if (compactionFileLocation == null)
{
- Set<SSTableReader> smallerSSTables = new HashSet<SSTableReader>(sstables);
- while (compactionFileLocation == null && smallerSSTables.size() > 1)
+ while (compactionFileLocation == null && toCompact.size() > 1)
{
- logger.warn("insufficient space to compact all requested files " + StringUtils.join(smallerSSTables, ", "));
- smallerSSTables.remove(cfs.getMaxSizeFile(smallerSSTables));
- compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(smallerSSTables));
+ logger.warn("insufficient space to compact all requested files " + StringUtils.join(toCompact, ", "));
+ // Note that we have removed files that are still marked as compacting. This suboptimal but ok since the caller will unmark all
+ // the sstables at the end.
+ toCompact.remove(cfs.getMaxSizeFile(toCompact));
+ compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact));
}
}
@@ -96,36 +103,32 @@ public class CompactionTask extends Abst
}
}
- // The collection of sstables passed may be empty (but not null); even if
- // it is not empty, it may compact down to nothing if all rows are deleted.
- assert sstables != null;
-
if (DatabaseDescriptor.isSnapshotBeforeCompaction())
cfs.table.snapshot(System.currentTimeMillis() + "-" + "compact-" + cfs.columnFamily);
// sanity check: all sstables must belong to the same cfs
- for (SSTableReader sstable : sstables)
+ for (SSTableReader sstable : toCompact)
assert sstable.descriptor.cfname.equals(cfs.columnFamily);
- CompactionController controller = new CompactionController(cfs, sstables, gcBefore, isUserDefined);
+ CompactionController controller = new CompactionController(cfs, toCompact, gcBefore, isUserDefined);
// new sstables from flush can be added during a compaction, but only the compaction can remove them,
// so in our single-threaded compaction world this is a valid way of determining if we're compacting
// all the sstables (that existed when we started)
CompactionType type = controller.isMajor()
? CompactionType.MAJOR
: CompactionType.MINOR;
- logger.info("Compacting {}: {}", type, sstables);
+ logger.info("Compacting {}: {}", type, toCompact);
long startTime = System.currentTimeMillis();
long totalkeysWritten = 0;
// TODO the int cast here is potentially buggy
- int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), (int)SSTableReader.getApproximateKeyCount(sstables));
+ int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), (int)SSTableReader.getApproximateKeyCount(toCompact));
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
SSTableWriter writer;
- CompactionIterator ci = new CompactionIterator(type, sstables, controller); // retain a handle so we can call close()
+ CompactionIterator ci = new CompactionIterator(type, toCompact, controller); // retain a handle so we can call close()
Iterator<AbstractCompactedRow> nni = Iterators.filter(ci, Predicates.notNull());
Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
@@ -138,11 +141,11 @@ public class CompactionTask extends Abst
// don't mark compacted in the finally block, since if there _is_ nondeleted data,
// we need to sync it (via closeAndOpen) first, so there is no period during which
// a crash could cause data loss.
- cfs.markCompacted(sstables);
+ cfs.markCompacted(toCompact);
return 0;
}
- writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, sstables);
+ writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, toCompact);
while (nni.hasNext())
{
AbstractCompactedRow row = nni.next();
@@ -151,7 +154,7 @@ public class CompactionTask extends Abst
if (DatabaseDescriptor.getPreheatKeyCache())
{
- for (SSTableReader sstable : sstables)
+ for (SSTableReader sstable : toCompact)
{
if (sstable.getCachedPosition(row.key) != null)
{
@@ -169,19 +172,19 @@ public class CompactionTask extends Abst
collector.finishCompaction(ci);
}
- SSTableReader ssTable = writer.closeAndOpenReader(getMaxDataAge(sstables));
- cfs.replaceCompactedSSTables(sstables, Arrays.asList(ssTable));
+ SSTableReader ssTable = writer.closeAndOpenReader(getMaxDataAge(toCompact));
+ cfs.replaceCompactedSSTables(toCompact, Arrays.asList(ssTable));
for (Entry<DecoratedKey, Long> entry : cachedKeys.entrySet()) // empty if preheat is off
ssTable.cacheKey(entry.getKey(), entry.getValue());
CompactionManager.instance.submitBackground(cfs);
long dTime = System.currentTimeMillis() - startTime;
- long startsize = SSTable.getTotalBytes(sstables);
+ long startsize = SSTable.getTotalBytes(toCompact);
long endsize = ssTable.length();
double ratio = (double)endsize / (double)startsize;
logger.info(String.format("Compacted to %s. %,d to %,d (~%d%% of original) bytes for %,d keys. Time: %,dms.",
- writer.getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
- return sstables.size();
+ ssTable.getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
+ return toCompact.size();
}
public static long getMaxDataAge(Collection<SSTableReader> sstables)