You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/04/11 10:36:55 UTC
svn commit: r1090978 - in /cassandra/trunk: ./ conf/
src/java/org/apache/cassandra/cache/ src/java/org/apache/cassandra/cli/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/io/ src/java/org/apache/c...
Author: slebresne
Date: Mon Apr 11 08:36:55 2011
New Revision: 1090978
URL: http://svn.apache.org/viewvc?rev=1090978&view=rev
Log:
Multithreaded compactions
patch by stuhood; reviewed by slebresne for CASSANDRA-2191
Added:
cassandra/trunk/src/java/org/apache/cassandra/io/CompactionInfo.java
Removed:
cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/conf/cassandra.yaml
cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java
cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManagerMBean.java
cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
cassandra/trunk/src/java/org/apache/cassandra/io/CompactionController.java
cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Apr 11 08:36:55 2011
@@ -19,6 +19,7 @@
* purge tombstones from row cache (CASSANDRA-2305)
* push replication_factor into strategy_options (CASSANDRA-1263)
* give snapshots the same name on each node (CASSANDRA-1791)
+ * multithreaded compaction (CASSANDRA-2191)
0.7.5
Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Mon Apr 11 08:36:55 2011
@@ -249,6 +249,10 @@ column_index_size_in_kb: 64
# will be logged specifying the row key.
in_memory_compaction_limit_in_mb: 64
+# Enables multiple compactions to execute at once. This is highly recommended
+# for preserving read performance in a mixed read/write workload.
+compaction_multithreading: true
+
# Track cached row keys during compaction, and re-cache their new
# positions in the compacted sstable. Disable if you use really large
# key caches.
@@ -357,4 +361,4 @@ encryption_options:
keystore: conf/.keystore
keystore_password: cassandra
truststore: conf/.truststore
- truststore_password: cassandra
\ No newline at end of file
+ truststore_password: cassandra
Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java Mon Apr 11 08:36:55 2011
@@ -22,6 +22,7 @@ import java.io.*;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -34,7 +35,7 @@ import org.apache.cassandra.config.Datab
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.CompactionManager;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.io.ICompactionInfo;
+import org.apache.cassandra.io.CompactionInfo;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.service.StorageService;
@@ -46,6 +47,9 @@ public abstract class AutoSavingCache<K,
{
private static final Logger logger = LoggerFactory.getLogger(AutoSavingCache.class);
+ /** True if a cache flush is currently executing: only one may execute at a time. */
+ public static final AtomicBoolean flushInProgress = new AtomicBoolean(false);
+
protected final String cfName;
protected final String tableName;
protected volatile ScheduledFuture<?> saveTask;
@@ -75,7 +79,7 @@ public abstract class AutoSavingCache<K,
public Writer getWriter()
{
- return new Writer();
+ return new Writer(tableName, cfName);
}
public void scheduleSaving(int savePeriodInSeconds)
@@ -184,22 +188,34 @@ public abstract class AutoSavingCache<K,
}
}
- public class Writer implements ICompactionInfo
+ public class Writer implements CompactionInfo.Holder
{
private final Set<K> keys;
+ private final CompactionInfo info;
private final long estimatedTotalBytes;
private long bytesWritten;
- private Writer()
+ private Writer(String ksname, String cfname)
{
keys = getKeySet();
-
long bytes = 0;
for (K key : keys)
bytes += translateKey(key).remaining();
-
// an approximation -- the keyset can change while saving
estimatedTotalBytes = bytes;
+ info = new CompactionInfo(ksname,
+ cfname,
+ "Save " + getCachePath().getName(),
+ 0,
+ estimatedTotalBytes);
+ }
+
+ public CompactionInfo getCompactionInfo()
+ {
+ long bytesWritten = this.bytesWritten;
+ // keyset can change in size, thus totalBytes can too
+ return info.forProgress(bytesWritten,
+ Math.max(bytesWritten, estimatedTotalBytes));
}
public void saveCache() throws IOException
@@ -238,26 +254,5 @@ public abstract class AutoSavingCache<K,
logger.info(String.format("Saved %s (%d items) in %d ms",
path.getName(), keys.size(), (System.currentTimeMillis() - start)));
}
-
- public long getTotalBytes()
- {
- // keyset can change in size, thus totalBytes can too
- return Math.max(estimatedTotalBytes, getBytesComplete());
- }
-
- public long getBytesComplete()
- {
- return bytesWritten;
- }
-
- public String getTaskType()
- {
- return "Save " + getCachePath().getName();
- }
-
- public String getColumnFamily()
- {
- return cfName;
- }
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java Mon Apr 11 08:36:55 2011
@@ -33,6 +33,7 @@ import org.apache.cassandra.config.Confi
import org.apache.cassandra.db.ColumnFamilyStoreMBean;
import org.apache.cassandra.db.CompactionManagerMBean;
import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.io.CompactionInfo;
import org.apache.cassandra.locator.SimpleSnitch;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.thrift.*;
@@ -1548,16 +1549,15 @@ public class CliClient extends CliUserHe
// compaction manager information
if (compactionManagerMBean != null)
{
- String compactionType = compactionManagerMBean.getCompactionType();
-
- // if ongoing compaction type is index build
- if (compactionType != null && compactionType.contains("index build"))
+ for (CompactionInfo info : compactionManagerMBean.getCompactions())
{
- String indexName = compactionManagerMBean.getColumnFamilyInProgress();
- long bytesCompacted = compactionManagerMBean.getBytesCompacted();
- long totalBytesToProcess = compactionManagerMBean.getBytesTotalInProgress();
-
- sessionState.out.printf("%nCurrently building index %s, completed %d of %d bytes.%n", indexName, bytesCompacted, totalBytesToProcess);
+ // if ongoing compaction type is index build
+ if (!info.getTaskType().contains("index build"))
+ continue;
+ sessionState.out.printf("%nCurrently building index %s, completed %d of %d bytes.%n",
+ info.getColumnFamily(),
+ info.getBytesComplete(),
+ info.getTotalBytes());
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Mon Apr 11 08:36:55 2011
@@ -82,6 +82,7 @@ public class Config
/* if the size of columns or super-columns are more than this, indexing will kick in */
public Integer column_index_size_in_kb = 64;
public Integer in_memory_compaction_limit_in_mb = 256;
+ public Boolean compaction_multithreading = true;
public String[] data_file_directories;
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Mon Apr 11 08:36:55 2011
@@ -340,7 +340,10 @@ public class DatabaseDescriptor
{
throw new ConfigurationException("in_memory_compaction_limit_in_mb must be a positive integer");
}
-
+
+ if (conf.compaction_multithreading == null)
+ conf.compaction_multithreading = true;
+
/* data file and commit log directories. they get created later, when they're needed. */
if (conf.commitlog_directory != null && conf.data_file_directories != null && conf.saved_caches_directory != null)
{
@@ -722,7 +725,12 @@ public class DatabaseDescriptor
{
return conf.in_memory_compaction_limit_in_mb * 1024 * 1024;
}
-
+
+ public static boolean getCompactionMultithreading()
+ {
+ return conf.compaction_multithreading;
+ }
+
public static String[] getAllDataFileLocations()
{
return conf.data_file_directories;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Apr 11 08:36:55 2011
@@ -1038,6 +1038,14 @@ public class ColumnFamilyStore implement
return data.getMemtable();
}
+ /**
+ * Package protected for access from the CompactionManager.
+ */
+ DataTracker getDataTracker()
+ {
+ return data;
+ }
+
public Collection<SSTableReader> getSSTables()
{
return data.getSSTables();
@@ -1754,7 +1762,7 @@ public class ColumnFamilyStore implement
*/
void clearUnsafe()
{
- data.clearUnsafe();
+ data.init();
}
/**
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Mon Apr 11 08:36:55 2011
@@ -28,8 +28,9 @@ import java.security.MessageDigest;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.AutoSavingCache;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.*;
@@ -58,13 +60,22 @@ import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
+/**
+ * A singleton which manages a private executor of ongoing compactions. A readwrite lock
+ * controls whether compactions can proceed: an external consumer can completely stop
+ * compactions by acquiring the write half of the lock via getCompactionLock().
+ *
+ * Scheduling for compaction is accomplished by swapping sstables to be compacted into
+ * a set via DataTracker. New scheduling attempts will ignore currently compacting
+ * sstables.
+ */
public class CompactionManager implements CompactionManagerMBean
{
public static final String MBEAN_OBJECT_NAME = "org.apache.cassandra.db:type=CompactionManager";
private static final Logger logger = LoggerFactory.getLogger(CompactionManager.class);
public static final CompactionManager instance;
- private final ReentrantLock compactionLock = new ReentrantLock();
- // todo: should provide a way to unlock in mbean?
+ // acquire as read to perform a compaction, and as write to prevent compactions
+ private final ReentrantReadWriteLock compactionLock = new ReentrantReadWriteLock();
static
{
@@ -83,9 +94,12 @@ public class CompactionManager implement
private CompactionExecutor executor = new CompactionExecutor();
private Map<ColumnFamilyStore, Integer> estimatedCompactions = new NonBlockingHashMap<ColumnFamilyStore, Integer>();
+ /**
+ * @return A lock, for which acquisition means no compactions can run.
+ */
public Lock getCompactionLock()
{
- return compactionLock;
+ return compactionLock.writeLock();
}
/**
@@ -99,7 +113,7 @@ public class CompactionManager implement
{
public Integer call() throws IOException
{
- compactionLock.lock();
+ compactionLock.readLock().lock();
try
{
if (cfs.isInvalid())
@@ -115,26 +129,32 @@ public class CompactionManager implement
logger.debug("Checking to see if compaction of " + cfs.columnFamily + " would be useful");
Set<List<SSTableReader>> buckets = getBuckets(convertSSTablesToPairs(cfs.getSSTables()), 50L * 1024L * 1024L);
updateEstimateFor(cfs, buckets);
+ int gcBefore = cfs.isIndex() ? Integer.MAX_VALUE : getDefaultGcBefore(cfs);
for (List<SSTableReader> sstables : buckets)
{
- if (sstables.size() >= minThreshold)
+ if (sstables.size() < minThreshold)
+ continue;
+ // if we have too many to compact all at once, compact older ones first -- this avoids
+ // re-compacting files we just created.
+ Collections.sort(sstables);
+ Collection<SSTableReader> tocompact = cfs.getDataTracker().markCompacting(sstables, minThreshold, maxThreshold);
+ if (tocompact == null)
+ // enough threads are busy in this bucket
+ continue;
+ try
+ {
+ return doCompaction(cfs, tocompact, gcBefore);
+ }
+ finally
{
- // if we have too many to compact all at once, compact older ones first -- this avoids
- // re-compacting files we just created.
- Collections.sort(sstables);
- int gcBefore = cfs.isIndex()
- ? Integer.MAX_VALUE
- : getDefaultGcBefore(cfs);
- return doCompaction(cfs,
- sstables.subList(0, Math.min(sstables.size(), maxThreshold)),
- gcBefore);
+ cfs.getDataTracker().unmarkCompacting(tocompact);
}
}
}
finally
{
- compactionLock.unlock();
+ compactionLock.readLock().unlock();
}
return 0;
}
@@ -171,16 +191,40 @@ public class CompactionManager implement
{
public Object call() throws IOException
{
- compactionLock.lock();
+ // acquire the write lock to schedule all sstables
+ compactionLock.writeLock().lock();
try
{
- if (!cfStore.isInvalid())
- doCleanupCompaction(cfStore, renewer);
+ if (cfStore.isInvalid())
+ return this;
+ Collection<SSTableReader> tocleanup = cfStore.getDataTracker().markCompacting(cfStore.getSSTables(), 1, Integer.MAX_VALUE);
+ if (tocleanup == null || tocleanup.isEmpty())
+ return this;
+ try
+ {
+ // downgrade the lock acquisition
+ compactionLock.readLock().lock();
+ compactionLock.writeLock().unlock();
+ try
+ {
+ doCleanupCompaction(cfStore, tocleanup, renewer);
+ }
+ finally
+ {
+ compactionLock.readLock().unlock();
+ }
+ }
+ finally
+ {
+ cfStore.getDataTracker().unmarkCompacting(tocleanup);
+ }
return this;
}
finally
{
- compactionLock.unlock();
+ // we probably already downgraded
+ if (compactionLock.writeLock().isHeldByCurrentThread())
+ compactionLock.writeLock().unlock();
}
}
};
@@ -193,16 +237,41 @@ public class CompactionManager implement
{
public Object call() throws IOException
{
- compactionLock.lock();
+ // acquire the write lock to schedule all sstables
+ compactionLock.writeLock().lock();
try
{
- if (!cfStore.isInvalid())
- doScrub(cfStore);
+ if (cfStore.isInvalid())
+ return this;
+
+ Collection<SSTableReader> toscrub = cfStore.getDataTracker().markCompacting(cfStore.getSSTables(), 1, Integer.MAX_VALUE);
+ if (toscrub == null || toscrub.isEmpty())
+ return this;
+ try
+ {
+ // downgrade the lock acquisition
+ compactionLock.readLock().lock();
+ compactionLock.writeLock().unlock();
+ try
+ {
+ doScrub(cfStore, toscrub);
+ }
+ finally
+ {
+ compactionLock.readLock().unlock();
+ }
+ }
+ finally
+ {
+ cfStore.getDataTracker().unmarkCompacting(toscrub);
+ }
return this;
}
finally
{
- compactionLock.unlock();
+ // we probably already downgraded
+ if (compactionLock.writeLock().isHeldByCurrentThread())
+ compactionLock.writeLock().unlock();
}
}
};
@@ -220,7 +289,8 @@ public class CompactionManager implement
{
public Object call() throws IOException
{
- compactionLock.lock();
+ // acquire the write lock long enough to schedule all sstables
+ compactionLock.writeLock().lock();
try
{
if (cfStore.isInvalid())
@@ -241,13 +311,35 @@ public class CompactionManager implement
{
sstables = cfStore.getSSTables();
}
-
- doCompaction(cfStore, sstables, gcBefore);
+
+ Collection<SSTableReader> tocompact = cfStore.getDataTracker().markCompacting(sstables, 0, Integer.MAX_VALUE);
+ if (tocompact == null || tocompact.isEmpty())
+ return this;
+ try
+ {
+ // downgrade the lock acquisition
+ compactionLock.readLock().lock();
+ compactionLock.writeLock().unlock();
+ try
+ {
+ doCompaction(cfStore, tocompact, gcBefore);
+ }
+ finally
+ {
+ compactionLock.readLock().unlock();
+ }
+ }
+ finally
+ {
+ cfStore.getDataTracker().unmarkCompacting(tocompact);
+ }
return this;
}
- finally
+ finally
{
- compactionLock.unlock();
+ // we probably already downgraded
+ if (compactionLock.writeLock().isHeldByCurrentThread())
+ compactionLock.writeLock().unlock();
}
}
};
@@ -293,7 +385,7 @@ public class CompactionManager implement
{
public Object call() throws IOException
{
- compactionLock.lock();
+ compactionLock.readLock().lock();
try
{
if (cfs.isInvalid())
@@ -320,16 +412,29 @@ public class CompactionManager implement
{
logger.error("No file to compact for user defined compaction");
}
+ // attempt to schedule the set
+ else if ((sstables = cfs.getDataTracker().markCompacting(sstables, 1, Integer.MAX_VALUE)) != null)
+ {
+ // success: perform the compaction
+ try
+ {
+ doCompaction(cfs, sstables, gcBefore);
+ }
+ finally
+ {
+ cfs.getDataTracker().unmarkCompacting(sstables);
+ }
+ }
else
{
- doCompaction(cfs, sstables, gcBefore);
+ logger.error("SSTables for user defined compaction are already being compacted.");
}
return this;
}
finally
{
- compactionLock.unlock();
+ compactionLock.readLock().unlock();
}
}
};
@@ -350,13 +455,16 @@ public class CompactionManager implement
return null;
}
+ /**
+ * Does not mutate data, so is not scheduled.
+ */
public Future<Object> submitValidation(final ColumnFamilyStore cfStore, final AntiEntropyService.Validator validator)
{
Callable<Object> callable = new Callable<Object>()
{
public Object call() throws IOException
{
- compactionLock.lock();
+ compactionLock.readLock().lock();
try
{
if (!cfStore.isInvalid())
@@ -365,7 +473,7 @@ public class CompactionManager implement
}
finally
{
- compactionLock.unlock();
+ compactionLock.readLock().unlock();
}
}
};
@@ -397,7 +505,6 @@ public class CompactionManager implement
table.snapshot("compact-" + cfs.columnFamily);
// sanity check: all sstables must belong to the same cfs
- logger.info("Compacting [" + StringUtils.join(sstables, ",") + "]");
for (SSTableReader sstable : sstables)
assert sstable.descriptor.cfname.equals(cfs.columnFamily);
@@ -422,6 +529,8 @@ public class CompactionManager implement
// so in our single-threaded compaction world this is a valid way of determining if we're compacting
// all the sstables (that existed when we started)
boolean major = cfs.isCompleteSSTables(sstables);
+ String type = major ? "Major" : "Minor";
+ logger.info("Compacting {}: {}", type, sstables);
long startTime = System.currentTimeMillis();
long totalkeysWritten = 0;
@@ -433,12 +542,11 @@ public class CompactionManager implement
SSTableWriter writer;
CompactionController controller = new CompactionController(cfs, sstables, major, gcBefore, false);
- CompactionIterator ci = new CompactionIterator(sstables, controller); // retain a handle so we can call close()
+ CompactionIterator ci = new CompactionIterator(type, sstables, controller); // retain a handle so we can call close()
Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
- executor.beginCompaction(cfs.columnFamily, ci);
-
Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
+ executor.beginCompaction(ci);
try
{
if (!nni.hasNext())
@@ -473,6 +581,7 @@ public class CompactionManager implement
finally
{
ci.close();
+ executor.finishCompaction(ci);
}
SSTableReader ssTable = writer.closeAndOpenReader(getMaxDataAge(sstables));
@@ -508,11 +617,11 @@ public class CompactionManager implement
*
* @throws IOException
*/
- private void doScrub(ColumnFamilyStore cfs) throws IOException
+ private void doScrub(ColumnFamilyStore cfs, Collection<SSTableReader> sstables) throws IOException
{
assert !cfs.isIndex();
- for (final SSTableReader sstable : cfs.getSSTables())
+ for (final SSTableReader sstable : sstables)
{
logger.info("Scrubbing " + sstable);
@@ -538,7 +647,7 @@ public class CompactionManager implement
}
SSTableWriter writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, null);
- executor.beginCompaction(cfs.columnFamily, new ScrubInfo(dataFile, sstable));
+ executor.beginCompaction(new ScrubInfo(dataFile, sstable));
int goodRows = 0, badRows = 0, emptyRows = 0;
while (!dataFile.isEOF())
@@ -682,7 +791,7 @@ public class CompactionManager implement
*
* @throws IOException
*/
- private void doCleanupCompaction(ColumnFamilyStore cfs, NodeId.OneShotRenewer renewer) throws IOException
+ private void doCleanupCompaction(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, NodeId.OneShotRenewer renewer) throws IOException
{
assert !cfs.isIndex();
Table table = cfs.table;
@@ -690,19 +799,12 @@ public class CompactionManager implement
boolean isCommutative = cfs.metadata.getDefaultValidator().isCommutative();
if (ranges.isEmpty())
{
- logger.info("Cleanup cannot be ran before the node join the ring");
+ logger.info("Cleanup cannot run before a node has joined the ring");
return;
}
- for (SSTableReader sstable : cfs.getSSTables())
+ for (SSTableReader sstable : sstables)
{
- logger.info("Cleaning up " + sstable);
- // Calculate the expected compacted filesize
- long expectedRangeFileSize = cfs.getExpectedCompactedFileSize(Arrays.asList(sstable)) / 2;
- String compactionFileLocation = table.getDataFileLocation(expectedRangeFileSize);
- if (compactionFileLocation == null)
- throw new IOException("disk full");
-
long startTime = System.currentTimeMillis();
long totalkeysWritten = 0;
@@ -712,23 +814,31 @@ public class CompactionManager implement
logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
SSTableWriter writer = null;
- SSTableScanner scanner = sstable.getDirectScanner(CompactionIterator.FILE_BUFFER_SIZE);
- SortedSet<ByteBuffer> indexedColumns = cfs.getIndexedColumns();
- executor.beginCompaction(cfs.columnFamily, new CleanupInfo(sstable, scanner));
try
{
- while (scanner.hasNext())
+ logger.info("Cleaning up " + sstable);
+ // Calculate the expected compacted filesize
+ long expectedRangeFileSize = cfs.getExpectedCompactedFileSize(Arrays.asList(sstable)) / 2;
+ String compactionFileLocation = table.getDataFileLocation(expectedRangeFileSize);
+ if (compactionFileLocation == null)
+ throw new IOException("disk full");
+
+ SSTableScanner scanner = sstable.getDirectScanner(CompactionIterator.FILE_BUFFER_SIZE);
+ SortedSet<ByteBuffer> indexedColumns = cfs.getIndexedColumns();
+ CleanupInfo ci = new CleanupInfo(sstable, scanner);
+ executor.beginCompaction(ci);
+ try
{
- SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
- if (Range.isTokenInRanges(row.getKey().token, ranges))
+ while (scanner.hasNext())
{
- writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, writer);
- writer.append(getCompactedRow(row, sstable.descriptor, false));
- totalkeysWritten++;
- }
- else
- {
- if (!indexedColumns.isEmpty() || isCommutative)
+ SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+ if (Range.isTokenInRanges(row.getKey().token, ranges))
+ {
+ writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, writer);
+ writer.append(getCompactedRow(row, sstable.descriptor, false));
+ totalkeysWritten++;
+ }
+ else if (!indexedColumns.isEmpty() || isCommutative)
{
while (row.hasNext())
{
@@ -741,10 +851,15 @@ public class CompactionManager implement
}
}
}
+ finally
+ {
+ scanner.close();
+ executor.finishCompaction(ci);
+ }
}
finally
{
- scanner.close();
+ cfs.getDataTracker().unmarkCompacting(Arrays.asList(sstable));
}
List<SSTableReader> results = new ArrayList<SSTableReader>();
@@ -828,7 +943,7 @@ public class CompactionManager implement
}
CompactionIterator ci = new ValidationCompactionIterator(cfs, validator.request.range);
- executor.beginCompaction(cfs.columnFamily, ci);
+ executor.beginCompaction(ci);
try
{
Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
@@ -845,6 +960,7 @@ public class CompactionManager implement
finally
{
ci.close();
+ executor.finishCompaction(ci);
}
}
@@ -912,23 +1028,33 @@ public class CompactionManager implement
return tablePairs;
}
+ /**
+ * Is not scheduled, because it is performing disjoint work from sstable compaction.
+ */
public Future submitIndexBuild(final ColumnFamilyStore cfs, final Table.IndexBuilder builder)
{
Runnable runnable = new Runnable()
{
public void run()
{
- compactionLock.lock();
+ compactionLock.readLock().lock();
try
{
if (cfs.isInvalid())
return;
- executor.beginCompaction(cfs.columnFamily, builder);
- builder.build();
+ executor.beginCompaction(builder);
+ try
+ {
+ builder.build();
+ }
+ finally
+ {
+ executor.finishCompaction(builder);
+ }
}
finally
{
- compactionLock.unlock();
+ compactionLock.readLock().unlock();
}
}
};
@@ -937,12 +1063,15 @@ public class CompactionManager implement
// future that will be immediately immediately get()ed and executed. Happens during a migration, which locks
// the compaction thread and then reinitializes a ColumnFamilyStore. Under normal circumstances, CFS spawns
// index jobs to the compaction manager (this) and blocks on them.
- if (compactionLock.isHeldByCurrentThread())
+ if (compactionLock.isWriteLockedByCurrentThread())
return new SimpleFuture(runnable);
else
return executor.submit(runnable);
}
+ /**
+ * Submits an sstable to be rebuilt: is not scheduled, since the sstable must not exist.
+ */
public Future<SSTableReader> submitSSTableBuild(final Descriptor desc, OperationType type)
{
// invalid descriptions due to missing or dropped CFS are handled by SSTW and StreamInSession.
@@ -951,15 +1080,22 @@ public class CompactionManager implement
{
public SSTableReader call() throws IOException
{
- compactionLock.lock();
+ compactionLock.readLock().lock();
try
{
- executor.beginCompaction(desc.cfname, builder);
- return builder.build();
+ executor.beginCompaction(builder);
+ try
+ {
+ return builder.build();
+ }
+ finally
+ {
+ executor.finishCompaction(builder);
+ }
}
finally
{
- compactionLock.unlock();
+ compactionLock.readLock().unlock();
}
}
};
@@ -972,8 +1108,27 @@ public class CompactionManager implement
{
public void runMayThrow() throws IOException
{
- executor.beginCompaction(writer.getColumnFamily(), writer);
- writer.saveCache();
+ if (!AutoSavingCache.flushInProgress.compareAndSet(false, true))
+ {
+ logger.debug("Cache flushing was already in progress: skipping {}", writer.getCompactionInfo());
+ return;
+ }
+ try
+ {
+ executor.beginCompaction(writer);
+ try
+ {
+ writer.saveCache();
+ }
+ finally
+ {
+ executor.finishCompaction(writer);
+ }
+ }
+ finally
+ {
+ AutoSavingCache.flushInProgress.set(false);
+ }
}
};
return executor.submit(runnable);
@@ -988,7 +1143,9 @@ public class CompactionManager implement
{
public ValidationCompactionIterator(ColumnFamilyStore cfs, Range range) throws IOException
{
- super(getCollatingIterator(cfs.getSSTables(), range), new CompactionController(cfs, cfs.getSSTables(), true, getDefaultGcBefore(cfs), false));
+ super("Validation",
+ getCollatingIterator(cfs.getSSTables(), range),
+ new CompactionController(cfs, cfs.getSSTables(), true, getDefaultGcBefore(cfs), false));
}
protected static CollatingIterator getCollatingIterator(Iterable<SSTableReader> sstables, Range range) throws IOException
@@ -1000,12 +1157,6 @@ public class CompactionManager implement
}
return iter;
}
-
- @Override
- public String getTaskType()
- {
- return "Validation";
- }
}
public void checkAllColumnFamilies() throws IOException
@@ -1035,67 +1186,57 @@ public class CompactionManager implement
private static class CompactionExecutor extends DebuggableThreadPoolExecutor
{
- private volatile String columnFamily;
- private volatile ICompactionInfo ci;
+ // a synchronized identity set of running tasks to their compaction info
+ private final Set<CompactionInfo.Holder> compactions;
public CompactionExecutor()
{
- super("CompactionExecutor", DatabaseDescriptor.getCompactionThreadPriority());
- }
-
- @Override
- public void afterExecute(Runnable r, Throwable t)
- {
- super.afterExecute(r, t);
- columnFamily = null;
- ci = null;
+ super(getThreadCount(),
+ 60,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new NamedThreadFactory("CompactionExecutor", DatabaseDescriptor.getCompactionThreadPriority()));
+ Map<CompactionInfo.Holder, Boolean> cmap = new IdentityHashMap<CompactionInfo.Holder, Boolean>();
+ compactions = Collections.synchronizedSet(Collections.newSetFromMap(cmap));
}
- void beginCompaction(String columnFamily, ICompactionInfo ci)
+ private static int getThreadCount()
{
- this.columnFamily = columnFamily;
- this.ci = ci;
+ if (!DatabaseDescriptor.getCompactionMultithreading())
+ return 1;
+ return Math.max(2, Runtime.getRuntime().availableProcessors());
}
- public String getColumnFamilyName()
+ void beginCompaction(CompactionInfo.Holder ci)
{
- return columnFamily == null ? null : columnFamily;
+ compactions.add(ci);
}
- public Long getBytesTotal()
+ void finishCompaction(CompactionInfo.Holder ci)
{
- return ci == null ? null : ci.getTotalBytes();
+ compactions.remove(ci);
}
- public Long getBytesCompleted()
+ public List<CompactionInfo.Holder> getCompactions()
{
- return ci == null ? null : ci.getBytesComplete();
+ return new ArrayList<CompactionInfo.Holder>(compactions);
}
-
- public String getType()
- {
- return ci == null ? null : ci.getTaskType();
- }
- }
-
- public String getColumnFamilyInProgress()
- {
- return executor.getColumnFamilyName();
- }
-
- public Long getBytesTotalInProgress()
- {
- return executor.getBytesTotal();
}
- public Long getBytesCompacted()
+ public List<CompactionInfo> getCompactions()
{
- return executor.getBytesCompleted();
+ List<CompactionInfo> out = new ArrayList<CompactionInfo>();
+ for (CompactionInfo.Holder ci : executor.getCompactions())
+ out.add(ci.getCompactionInfo());
+ return out;
}
- public String getCompactionType()
+ public List<String> getCompactionSummary()
{
- return executor.getType();
+ List<String> out = new ArrayList<String>();
+ for (CompactionInfo.Holder ci : executor.getCompactions())
+ out.add(ci.getCompactionInfo().toString());
+ return out;
}
public int getPendingTasks()
@@ -1182,64 +1323,57 @@ public class CompactionManager implement
}
}
- private static class CleanupInfo implements ICompactionInfo
+ private static class CleanupInfo implements CompactionInfo.Holder
{
private final SSTableReader sstable;
private final SSTableScanner scanner;
-
public CleanupInfo(SSTableReader sstable, SSTableScanner scanner)
{
this.sstable = sstable;
this.scanner = scanner;
}
- public long getTotalBytes()
- {
- return scanner.getFileLength();
- }
-
- public long getBytesComplete()
- {
- return scanner.getFilePointer();
- }
-
- public String getTaskType()
+ public CompactionInfo getCompactionInfo()
{
- return "Cleanup of " + sstable.getColumnFamilyName();
+ try
+ {
+ return new CompactionInfo(sstable.descriptor.ksname,
+ sstable.descriptor.cfname,
+ "Cleanup of " + sstable.getColumnFamilyName(),
+ scanner.getFilePointer(),
+ scanner.getFileLength());
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException();
+ }
}
}
- private static class ScrubInfo implements ICompactionInfo
+ private static class ScrubInfo implements CompactionInfo.Holder
{
private final BufferedRandomAccessFile dataFile;
private final SSTableReader sstable;
-
public ScrubInfo(BufferedRandomAccessFile dataFile, SSTableReader sstable)
{
this.dataFile = dataFile;
this.sstable = sstable;
}
- public long getTotalBytes()
+ public CompactionInfo getCompactionInfo()
{
try
{
- return dataFile.length();
+ return new CompactionInfo(sstable.descriptor.ksname,
+ sstable.descriptor.cfname,
+ "Scrub " + sstable,
+ dataFile.getFilePointer(),
+ dataFile.length());
}
- catch (IOException e)
+ catch (Exception e)
{
- throw new RuntimeException(e);
+ throw new RuntimeException();
}
}
-
- public long getBytesComplete()
- {
- return dataFile.getFilePointer();
- }
-
- public String getTaskType()
- {
- return "Scrub " + sstable;
- }
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManagerMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManagerMBean.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManagerMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManagerMBean.java Mon Apr 11 08:36:55 2011
@@ -18,28 +18,17 @@
package org.apache.cassandra.db;
-public interface CompactionManagerMBean
-{
-
- /**
- * @return the columnfamily currently being compacted; null if none
- */
- public String getColumnFamilyInProgress();
+import java.util.List;
- /**
- * @return the total (data, not including index and filter) bytes being compacted; null if none
- */
- public Long getBytesTotalInProgress();
+import org.apache.cassandra.io.CompactionInfo;
- /**
- * @return the progress on the current compaction; null if none
- */
- public Long getBytesCompacted();
+public interface CompactionManagerMBean
+{
+ /** List of running compaction objects. */
+ public List<CompactionInfo> getCompactions();
- /**
- * @return the type of compaction operation currently in progress; null if none
- */
- public String getCompactionType();
+ /** List of running compaction summary strings. */
+ public List<String> getCompactionSummary();
/**
* @return estimated number of compactions remaining to perform
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Mon Apr 11 08:36:55 2011
@@ -54,7 +54,8 @@ public class DataTracker
public DataTracker(ColumnFamilyStore cfstore)
{
this.cfstore = cfstore;
- this.view = new AtomicReference<View>(new View(new Memtable(cfstore), Collections.<Memtable>emptySet(), Collections.<SSTableReader>emptySet()));
+ this.view = new AtomicReference<View>();
+ this.init();
}
public Memtable getMemtable()
@@ -154,6 +155,57 @@ public class DataTracker
}
}
+ /**
+ * @return A subset of the given active sstables that have been marked compacting,
+ * or null if the thresholds cannot be met: files that are marked compacting must
+ * later be unmarked using unmarkCompacting.
+ */
+ public Set<SSTableReader> markCompacting(Collection<SSTableReader> tomark, int min, int max)
+ {
+ if (max < min || max < 1)
+ return null;
+ View currentView, newView;
+ Set<SSTableReader> subset = null;
+ // order preserving set copy of the input
+ Set<SSTableReader> remaining = new LinkedHashSet<SSTableReader>(tomark);
+ do
+ {
+ currentView = view.get();
+
+ // find the subset that is active and not already compacting
+ remaining.removeAll(currentView.compacting);
+ remaining.retainAll(currentView.sstables);
+ if (remaining.size() < min)
+ // cannot meet the min threshold
+ return null;
+
+ // cap the newly compacting items into a subset set
+ subset = new HashSet<SSTableReader>();
+ Iterator<SSTableReader> iter = remaining.iterator();
+ for (int added = 0; added < max && iter.hasNext(); added++)
+ subset.add(iter.next());
+
+ newView = currentView.markCompacting(subset);
+ }
+ while (!view.compareAndSet(currentView, newView));
+ return subset;
+ }
+
+ /**
+ * Removes files from compacting status: this is different from 'markCompacted'
+ * because it should be run regardless of whether a compaction succeeded.
+ */
+ public void unmarkCompacting(Collection<SSTableReader> unmark)
+ {
+ View currentView, newView;
+ do
+ {
+ currentView = view.get();
+ newView = currentView.unmarkCompacting(unmark);
+ }
+ while (!view.compareAndSet(currentView, newView));
+ }
+
public void markCompacted(Collection<SSTableReader> sstables)
{
replace(sstables, Collections.<SSTableReader>emptyList());
@@ -180,9 +232,13 @@ public class DataTracker
replace(getSSTables(), Collections.<SSTableReader>emptyList());
}
- public void clearUnsafe()
+ /** (Re)initializes the tracker, purging all references. */
+ void init()
{
- view.set(new View(new Memtable(cfstore), Collections.<Memtable>emptySet(), Collections.<SSTableReader>emptySet()));
+ view.set(new View(new Memtable(cfstore),
+ Collections.<Memtable>emptySet(),
+ Collections.<SSTableReader>emptySet(),
+ Collections.<SSTableReader>emptySet()));
}
private void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
@@ -384,31 +440,34 @@ public class DataTracker
/**
* An immutable structure holding the current memtable, the memtables pending
- * flush and the sstables for a column family.
+ * flush, the sstables for a column family, and the sstables that are active
+ * in compaction (a subset of the sstables).
*/
static class View
{
public final Memtable memtable;
public final Set<Memtable> memtablesPendingFlush;
public final Set<SSTableReader> sstables;
+ public final Set<SSTableReader> compacting;
- public View(Memtable memtable, Set<Memtable> pendingFlush, Set<SSTableReader> sstables)
+ public View(Memtable memtable, Set<Memtable> pendingFlush, Set<SSTableReader> sstables, Set<SSTableReader> compacting)
{
this.memtable = memtable;
this.memtablesPendingFlush = Collections.unmodifiableSet(pendingFlush);
this.sstables = Collections.unmodifiableSet(sstables);
+ this.compacting = Collections.unmodifiableSet(compacting);
}
public View switchMemtable(Memtable newMemtable)
{
Set<Memtable> newPending = new HashSet<Memtable>(memtablesPendingFlush);
newPending.add(memtable);
- return new View(newMemtable, newPending, sstables);
+ return new View(newMemtable, newPending, sstables, compacting);
}
public View renewMemtable(Memtable newMemtable)
{
- return new View(newMemtable, memtablesPendingFlush, sstables);
+ return new View(newMemtable, memtablesPendingFlush, sstables, compacting);
}
public View replaceFlushed(Memtable flushedMemtable, SSTableReader newSSTable)
@@ -417,7 +476,7 @@ public class DataTracker
Set<SSTableReader> newSSTables = new HashSet<SSTableReader>(sstables);
newPendings.remove(flushedMemtable);
newSSTables.add(newSSTable);
- return new View(memtable, newPendings, newSSTables);
+ return new View(memtable, newPendings, newSSTables, compacting);
}
public View replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
@@ -425,7 +484,21 @@ public class DataTracker
Set<SSTableReader> sstablesNew = new HashSet<SSTableReader>(sstables);
Iterables.addAll(sstablesNew, replacements);
sstablesNew.removeAll(oldSSTables);
- return new View(memtable, memtablesPendingFlush, sstablesNew);
+ return new View(memtable, memtablesPendingFlush, sstablesNew, compacting);
+ }
+
+ public View markCompacting(Collection<SSTableReader> tomark)
+ {
+ Set<SSTableReader> compactingNew = new HashSet<SSTableReader>(compacting);
+ compactingNew.addAll(tomark);
+ return new View(memtable, memtablesPendingFlush, sstables, compactingNew);
+ }
+
+ public View unmarkCompacting(Collection<SSTableReader> tounmark)
+ {
+ Set<SSTableReader> compactingNew = new HashSet<SSTableReader>(compacting);
+ compactingNew.removeAll(tounmark);
+ return new View(memtable, memtablesPendingFlush, sstables, compactingNew);
}
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Mon Apr 11 08:36:55 2011
@@ -39,7 +39,7 @@ import org.apache.cassandra.db.commitlog
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.dht.LocalToken;
-import org.apache.cassandra.io.ICompactionInfo;
+import org.apache.cassandra.io.CompactionInfo;
import org.apache.cassandra.io.sstable.ReducingKeyIterator;
import org.apache.cassandra.io.sstable.SSTableDeletingReference;
import org.apache.cassandra.io.sstable.SSTableReader;
@@ -597,7 +597,7 @@ public class Table
return replicationStrategy;
}
- public class IndexBuilder implements ICompactionInfo
+ public class IndexBuilder implements CompactionInfo.Holder
{
private final ColumnFamilyStore cfs;
private final SortedSet<ByteBuffer> columns;
@@ -610,6 +610,15 @@ public class Table
this.iter = iter;
}
+ public CompactionInfo getCompactionInfo()
+ {
+ return new CompactionInfo(cfs.table.name,
+ cfs.columnFamily,
+ String.format("Secondary index build %s", cfs.columnFamily),
+ iter.getTotalBytes(),
+ iter.getBytesRead());
+ }
+
public void build()
{
while (iter.hasNext())
@@ -646,21 +655,6 @@ public class Table
throw new RuntimeException(e);
}
}
-
- public long getTotalBytes()
- {
- return iter.getTotalBytes();
- }
-
- public long getBytesComplete()
- {
- return iter.getBytesRead();
- }
-
- public String getTaskType()
- {
- return String.format("Secondary index build %s", cfs.columnFamily);
- }
}
private Object indexLockFor(ByteBuffer key)
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/CompactionController.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionController.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionController.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionController.java Mon Apr 11 08:36:55 2011
@@ -61,6 +61,18 @@ public class CompactionController
return forceDeserialize ? basicDeserializingController : basicController;
}
+ /** @return The keyspace name: only valid if created with a non-null CFS. */
+ public String getKeyspace()
+ {
+ return cfs != null ? cfs.table.name : "n/a";
+ }
+
+ /** @return The column family name: only valid if created with a non-null CFS. */
+ public String getColumnFamily()
+ {
+ return cfs != null ? cfs.columnFamily : "n/a";
+ }
+
public boolean shouldPurge(DecoratedKey key)
{
return isMajor || (cfs != null && !cfs.isKeyInRemainingSSTables(key, sstables));
Added: cassandra/trunk/src/java/org/apache/cassandra/io/CompactionInfo.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionInfo.java?rev=1090978&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionInfo.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionInfo.java Mon Apr 11 08:36:55 2011
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.io;
+
+import java.io.Serializable;
+
+/** Implements serializable to allow structured info to be returned via JMX. */
+public final class CompactionInfo implements Serializable
+{
+ private final String ksname;
+ private final String cfname;
+ private final String tasktype;
+ private final long bytesComplete;
+ private final long totalBytes;
+
+ public CompactionInfo(String ksname, String cfname, String tasktype, long bytesComplete, long totalBytes)
+ {
+ this.ksname = ksname;
+ this.cfname = cfname;
+ this.tasktype = tasktype;
+ this.bytesComplete = bytesComplete;
+ this.totalBytes = totalBytes;
+ }
+
+ /** @return A copy of this CompactionInfo with updated progress. */
+ public CompactionInfo forProgress(long bytesComplete, long totalBytes)
+ {
+ return new CompactionInfo(ksname, cfname, tasktype, bytesComplete, totalBytes);
+ }
+
+ public String getKeyspace()
+ {
+ return ksname;
+ }
+
+ public String getColumnFamily()
+ {
+ return cfname;
+ }
+
+ public long getBytesComplete()
+ {
+ return bytesComplete;
+ }
+
+ public long getTotalBytes()
+ {
+ return totalBytes;
+ }
+
+ public String getTaskType()
+ {
+ return tasktype;
+ }
+
+ public String toString()
+ {
+ StringBuilder buff = new StringBuilder();
+ buff.append(getTaskType()).append('@').append(hashCode());
+ buff.append('(').append(getKeyspace()).append(", ").append(getColumnFamily());
+ buff.append(", ").append(getBytesComplete()).append('/').append(getTotalBytes());
+ return buff.append(')').toString();
+ }
+
+ public interface Holder
+ {
+ public CompactionInfo getCompactionInfo();
+ }
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java Mon Apr 11 08:36:55 2011
@@ -45,28 +45,30 @@ import org.apache.cassandra.utils.FBUtil
import org.apache.cassandra.utils.ReducingIterator;
public class CompactionIterator extends ReducingIterator<SSTableIdentityIterator, AbstractCompactedRow>
-implements Closeable, ICompactionInfo
+implements Closeable, CompactionInfo.Holder
{
private static Logger logger = LoggerFactory.getLogger(CompactionIterator.class);
public static final int FILE_BUFFER_SIZE = 1024 * 1024;
protected final List<SSTableIdentityIterator> rows = new ArrayList<SSTableIdentityIterator>();
+ protected final String type;
protected final CompactionController controller;
private long totalBytes;
private long bytesRead;
private long row;
- public CompactionIterator(Iterable<SSTableReader> sstables, CompactionController controller) throws IOException
+ public CompactionIterator(String type, Iterable<SSTableReader> sstables, CompactionController controller) throws IOException
{
- this(getCollatingIterator(sstables), controller);
+ this(type, getCollatingIterator(sstables), controller);
}
@SuppressWarnings("unchecked")
- protected CompactionIterator(Iterator iter, CompactionController controller)
+ protected CompactionIterator(String type, Iterator iter, CompactionController controller)
{
super(iter);
+ this.type = type;
this.controller = controller;
row = 0;
totalBytes = bytesRead = 0;
@@ -88,6 +90,15 @@ implements Closeable, ICompactionInfo
return iter;
}
+ public CompactionInfo getCompactionInfo()
+ {
+ return new CompactionInfo(controller.getKeyspace(),
+ controller.getColumnFamily(),
+ type,
+ bytesRead,
+ totalBytes);
+ }
+
@Override
protected boolean isEqual(SSTableIdentityIterator o1, SSTableIdentityIterator o2)
{
@@ -160,18 +171,8 @@ implements Closeable, ICompactionInfo
return ((CollatingIterator)source).getIterators();
}
- public long getTotalBytes()
- {
- return totalBytes;
- }
-
- public long getBytesComplete()
- {
- return bytesRead;
- }
-
- public String getTaskType()
+ public String toString()
{
- return controller.isMajor ? "Major" : "Minor";
+ return this.getCompactionInfo().toString();
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Mon Apr 11 08:36:55 2011
@@ -42,7 +42,7 @@ import org.apache.cassandra.db.marshal.A
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.AbstractCompactedRow;
import org.apache.cassandra.io.CompactionController;
-import org.apache.cassandra.io.ICompactionInfo;
+import org.apache.cassandra.io.CompactionInfo;
import org.apache.cassandra.io.LazilyCompactedRow;
import org.apache.cassandra.io.PrecompactedRow;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
@@ -249,7 +249,7 @@ public class SSTableWriter extends SSTab
* Removes the given SSTable from temporary status and opens it, rebuilding the
* bloom filter and row index from the data file.
*/
- public static class Builder implements ICompactionInfo
+ public static class Builder implements CompactionInfo.Holder
{
private final Descriptor desc;
private final OperationType type;
@@ -263,6 +263,24 @@ public class SSTableWriter extends SSTab
cfs = Table.open(desc.ksname).getColumnFamilyStore(desc.cfname);
}
+ public CompactionInfo getCompactionInfo()
+ {
+ maybeOpenIndexer();
+ try
+ {
+ // both file offsets are still valid post-close
+ return new CompactionInfo(desc.ksname,
+ desc.cfname,
+ "SSTable rebuild",
+ indexer.dfile.getFilePointer(),
+ indexer.dfile.length());
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
// lazy-initialize the file to avoid opening it until it's actually executing on the CompactionManager,
// since the 8MB buffers can use up heap quickly
private void maybeOpenIndexer()
@@ -301,32 +319,6 @@ public class SSTableWriter extends SSTab
logger.debug("estimated row count was {} of real count", ((double)estimatedRows) / rows);
return SSTableReader.open(rename(desc, SSTable.componentsFor(desc)));
}
-
- public long getTotalBytes()
- {
- maybeOpenIndexer();
- try
- {
- // (length is still valid post-close)
- return indexer.dfile.length();
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
- }
-
- public long getBytesComplete()
- {
- maybeOpenIndexer();
- // (getFilePointer is still valid post-close)
- return indexer.dfile.getFilePointer();
- }
-
- public String getTaskType()
- {
- return "SSTable rebuild";
- }
}
static class RowIndexer
Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java Mon Apr 11 08:36:55 2011
@@ -40,6 +40,7 @@ import org.apache.cassandra.concurrent.I
import org.apache.cassandra.db.ColumnFamilyStoreMBean;
import org.apache.cassandra.db.CompactionManagerMBean;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.CompactionInfo;
import org.apache.cassandra.net.MessagingServiceMBean;
import org.apache.cassandra.utils.EstimatedHistogram;
@@ -324,10 +325,15 @@ public class NodeCmd
public void printCompactionStats(PrintStream outs)
{
CompactionManagerMBean cm = probe.getCompactionManagerProxy();
- outs.println("compaction type: " + (cm.getCompactionType() == null ? "n/a" : cm.getCompactionType()));
- outs.println("column family: " + (cm.getColumnFamilyInProgress() == null ? "n/a" : cm.getColumnFamilyInProgress()));
- outs.println("bytes compacted: " + (cm.getBytesCompacted() == null ? "n/a" : cm.getBytesCompacted()));
- outs.println("bytes total in progress: " + (cm.getBytesTotalInProgress() == null ? "n/a" : cm.getBytesTotalInProgress() ));
+ for (CompactionInfo c : cm.getCompactions())
+ {
+ outs.println("compaction type: " + c.getTaskType());
+ outs.println("keyspace: " + c.getKeyspace());
+ outs.println("column family: " + c.getColumnFamily());
+ outs.println("bytes compacted: " + c.getBytesComplete());
+ outs.println("bytes total: " + c.getTotalBytes());
+ outs.println("-----------------");
+ }
outs.println("pending tasks: " + cm.getPendingTasks());
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java Mon Apr 11 08:36:55 2011
@@ -46,18 +46,23 @@ public class CompactionsTest extends Cle
public static final String TABLE2 = "Keyspace2";
public static final InetAddress LOCAL = FBUtilities.getLocalAddress();
+ public static final int MIN_COMPACTION_THRESHOLD = 2;
+
@Test
public void testCompactions() throws IOException, ExecutionException, InterruptedException
{
- CompactionManager.instance.disableAutoCompaction();
-
// this test does enough rows to force multiple block indexes to be used
Table table = Table.open(TABLE1);
ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
final int ROWS_PER_SSTABLE = 10;
+ final int SSTABLES = (DatabaseDescriptor.getIndexInterval() * 3 / ROWS_PER_SSTABLE);
+
+ // disable compaction while flushing
+ store.disableAutoCompaction();
+
Set<DecoratedKey> inserted = new HashSet<DecoratedKey>();
- for (int j = 0; j < (DatabaseDescriptor.getIndexInterval() * 3) / ROWS_PER_SSTABLE; j++) {
+ for (int j = 0; j < SSTABLES; j++) {
for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
DecoratedKey key = Util.dk(String.valueOf(i % 2));
RowMutation rm = new RowMutation(TABLE1, key.key);
@@ -68,10 +73,22 @@ public class CompactionsTest extends Cle
store.forceBlockingFlush();
assertEquals(inserted.toString(), inserted.size(), Util.getRangeSlice(store).size());
}
+ // re-enable compaction with thresholds low enough to force a few rounds
+ store.setMinimumCompactionThreshold(2);
+ store.setMaximumCompactionThreshold(4);
+ // loop submitting parallel compactions until they all return 0
while (true)
{
- Future<Integer> ft = CompactionManager.instance.submitMinorIfNeeded(store);
- if (ft.get() == 0)
+ ArrayList<Future<Integer>> compactions = new ArrayList<Future<Integer>>();
+ for (int i = 0; i < 10; i++)
+ compactions.add(CompactionManager.instance.submitMinorIfNeeded(store));
+ // another compaction attempt will be launched in the background by
+ // each completing compaction: not much we can do to control them here
+ boolean progress = false;
+ for (Future<Integer> compaction : compactions)
+ if (compaction.get() > 0)
+ progress = true;
+ if (!progress)
break;
}
if (store.getSSTables().size() > 1)
@@ -136,5 +153,4 @@ public class CompactionsTest extends Cle
buckets = CompactionManager.getBuckets(pairs, 10); // notice the min is 10
assertEquals(1, buckets.size());
}
-
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java Mon Apr 11 08:36:55 2011
@@ -309,7 +309,7 @@ public class LazilyCompactedRowTest exte
{
public LazyCompactionIterator(Iterable<SSTableReader> sstables, CompactionController controller) throws IOException
{
- super(sstables, controller);
+ super("Lazy", sstables, controller);
}
@Override
@@ -323,7 +323,7 @@ public class LazilyCompactedRowTest exte
{
public PreCompactingIterator(Iterable<SSTableReader> sstables, CompactionController controller) throws IOException
{
- super(sstables, controller);
+ super("Pre", sstables, controller);
}
@Override