You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/04/11 10:36:55 UTC

svn commit: r1090978 - in /cassandra/trunk: ./ conf/ src/java/org/apache/cassandra/cache/ src/java/org/apache/cassandra/cli/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ src/java/org/apache/c...

Author: slebresne
Date: Mon Apr 11 08:36:55 2011
New Revision: 1090978

URL: http://svn.apache.org/viewvc?rev=1090978&view=rev
Log:
Multithreaded compactions
patch by stuhood; reviewed by slebresne for CASSANDRA-2191

Added:
    cassandra/trunk/src/java/org/apache/cassandra/io/CompactionInfo.java
Removed:
    cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/conf/cassandra.yaml
    cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java
    cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
    cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManagerMBean.java
    cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    cassandra/trunk/src/java/org/apache/cassandra/io/CompactionController.java
    cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Apr 11 08:36:55 2011
@@ -19,6 +19,7 @@
  * purge tombstones from row cache (CASSANDRA-2305)
  * push replication_factor into strategy_options (CASSANDRA-1263)
  * give snapshots the same name on each node (CASSANDRA-1791)
+ * multithreaded compaction (CASSANDRA-2191)
 
 
 0.7.5

Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Mon Apr 11 08:36:55 2011
@@ -249,6 +249,10 @@ column_index_size_in_kb: 64
 # will be logged specifying the row key.
 in_memory_compaction_limit_in_mb: 64
 
+# Enables multiple compactions to execute at once. This is highly recommended
+# for preserving read performance in a mixed read/write workload.
+compaction_multithreading: true
+
 # Track cached row keys during compaction, and re-cache their new
 # positions in the compacted sstable.  Disable if you use really large
 # key caches.
@@ -357,4 +361,4 @@ encryption_options:
     keystore: conf/.keystore
     keystore_password: cassandra
     truststore: conf/.truststore
-    truststore_password: cassandra
\ No newline at end of file
+    truststore_password: cassandra

Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/AutoSavingCache.java Mon Apr 11 08:36:55 2011
@@ -22,6 +22,7 @@ import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -34,7 +35,7 @@ import org.apache.cassandra.config.Datab
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.CompactionManager;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.io.ICompactionInfo;
+import org.apache.cassandra.io.CompactionInfo;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.service.StorageService;
@@ -46,6 +47,9 @@ public abstract class AutoSavingCache<K,
 {
     private static final Logger logger = LoggerFactory.getLogger(AutoSavingCache.class);
 
+    /** True if a cache flush is currently executing: only one may execute at a time. */
+    public static final AtomicBoolean flushInProgress = new AtomicBoolean(false);
+
     protected final String cfName;
     protected final String tableName;
     protected volatile ScheduledFuture<?> saveTask;
@@ -75,7 +79,7 @@ public abstract class AutoSavingCache<K,
 
     public Writer getWriter()
     {
-        return new Writer();
+        return new Writer(tableName, cfName);
     }
 
     public void scheduleSaving(int savePeriodInSeconds)
@@ -184,22 +188,34 @@ public abstract class AutoSavingCache<K,
         }
     }
 
-    public class Writer implements ICompactionInfo
+    public class Writer implements CompactionInfo.Holder
     {
         private final Set<K> keys;
+        private final CompactionInfo info;
         private final long estimatedTotalBytes;
         private long bytesWritten;
 
-        private Writer()
+        private Writer(String ksname, String cfname)
         {
             keys = getKeySet();
-
             long bytes = 0;
             for (K key : keys)
                 bytes += translateKey(key).remaining();
-
             // an approximation -- the keyset can change while saving
             estimatedTotalBytes = bytes;
+            info = new CompactionInfo(ksname,
+                                      cfname,
+                                      "Save " + getCachePath().getName(),
+                                      0,
+                                      estimatedTotalBytes);
+        }
+
+        public CompactionInfo getCompactionInfo()
+        {
+            long bytesWritten = this.bytesWritten;
+            // keyset can change in size, thus totalBytes can too
+            return info.forProgress(bytesWritten,
+                                    Math.max(bytesWritten, estimatedTotalBytes));
         }
 
         public void saveCache() throws IOException
@@ -238,26 +254,5 @@ public abstract class AutoSavingCache<K,
             logger.info(String.format("Saved %s (%d items) in %d ms",
                         path.getName(), keys.size(), (System.currentTimeMillis() - start)));
         }
-
-        public long getTotalBytes()
-        {
-            // keyset can change in size, thus totalBytes can too
-            return Math.max(estimatedTotalBytes, getBytesComplete());
-        }
-
-        public long getBytesComplete()
-        {
-            return bytesWritten;
-        }
-
-        public String getTaskType()
-        {
-            return "Save " + getCachePath().getName();
-        }
-
-        public String getColumnFamily()
-        {
-            return cfName;
-        }
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java Mon Apr 11 08:36:55 2011
@@ -33,6 +33,7 @@ import org.apache.cassandra.config.Confi
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
 import org.apache.cassandra.db.CompactionManagerMBean;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.io.CompactionInfo;
 import org.apache.cassandra.locator.SimpleSnitch;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.thrift.*;
@@ -1548,16 +1549,15 @@ public class CliClient extends CliUserHe
             // compaction manager information
             if (compactionManagerMBean != null)
             {
-                String compactionType = compactionManagerMBean.getCompactionType();
-
-                // if ongoing compaction type is index build
-                if (compactionType != null && compactionType.contains("index build"))
+                for (CompactionInfo info : compactionManagerMBean.getCompactions())
                 {
-                    String indexName         = compactionManagerMBean.getColumnFamilyInProgress();
-                    long bytesCompacted      = compactionManagerMBean.getBytesCompacted();
-                    long totalBytesToProcess = compactionManagerMBean.getBytesTotalInProgress();
-
-                    sessionState.out.printf("%nCurrently building index %s, completed %d of %d bytes.%n", indexName, bytesCompacted, totalBytesToProcess);
+                    // if ongoing compaction type is index build
+                    if (!info.getTaskType().contains("index build"))
+                        continue;
+                    sessionState.out.printf("%nCurrently building index %s, completed %d of %d bytes.%n",
+                                            info.getColumnFamily(),
+                                            info.getBytesComplete(),
+                                            info.getTotalBytes());
                 }
             }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Mon Apr 11 08:36:55 2011
@@ -82,6 +82,7 @@ public class Config
     /* if the size of columns or super-columns are more than this, indexing will kick in */
     public Integer column_index_size_in_kb = 64;
     public Integer in_memory_compaction_limit_in_mb = 256;
+    public Boolean compaction_multithreading = true;
     
     public String[] data_file_directories;
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Mon Apr 11 08:36:55 2011
@@ -340,7 +340,10 @@ public class DatabaseDescriptor
             {
                 throw new ConfigurationException("in_memory_compaction_limit_in_mb must be a positive integer");
             }
-            
+
+            if (conf.compaction_multithreading == null)
+                conf.compaction_multithreading = true;
+
             /* data file and commit log directories. they get created later, when they're needed. */
             if (conf.commitlog_directory != null && conf.data_file_directories != null && conf.saved_caches_directory != null)
             {
@@ -722,7 +725,12 @@ public class DatabaseDescriptor
     {
         return conf.in_memory_compaction_limit_in_mb * 1024 * 1024;
     }
-    
+
+    public static boolean getCompactionMultithreading()
+    {
+        return conf.compaction_multithreading;
+    }
+
     public static String[] getAllDataFileLocations()
     {
         return conf.data_file_directories;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Apr 11 08:36:55 2011
@@ -1038,6 +1038,14 @@ public class ColumnFamilyStore implement
         return data.getMemtable();
     }
 
+    /**
+     * Package protected for access from the CompactionManager.
+     */
+    DataTracker getDataTracker()
+    {
+        return data;
+    }
+
     public Collection<SSTableReader> getSSTables()
     {
         return data.getSSTables();
@@ -1754,7 +1762,7 @@ public class ColumnFamilyStore implement
      */
     void clearUnsafe()
     {
-        data.clearUnsafe();
+        data.init();
     }
 
     /**

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Mon Apr 11 08:36:55 2011
@@ -28,8 +28,9 @@ import java.security.MessageDigest;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cache.AutoSavingCache;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.*;
@@ -58,13 +60,22 @@ import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
+/**
+ * A singleton which manages a private executor of ongoing compactions. A readwrite lock
+ * controls whether compactions can proceed: an external consumer can completely stop
+ * compactions by acquiring the write half of the lock via getCompactionLock().
+ *
+ * Scheduling for compaction is accomplished by swapping sstables to be compacted into
+ * a set via DataTracker. New scheduling attempts will ignore currently compacting
+ * sstables.
+ */
 public class CompactionManager implements CompactionManagerMBean
 {
     public static final String MBEAN_OBJECT_NAME = "org.apache.cassandra.db:type=CompactionManager";
     private static final Logger logger = LoggerFactory.getLogger(CompactionManager.class);
     public static final CompactionManager instance;
-    private final ReentrantLock compactionLock = new ReentrantLock();
-    // todo: should provide a way to unlock in mbean?
+    // acquire as read to perform a compaction, and as write to prevent compactions
+    private final ReentrantReadWriteLock compactionLock = new ReentrantReadWriteLock();
 
     static
     {
@@ -83,9 +94,12 @@ public class CompactionManager implement
     private CompactionExecutor executor = new CompactionExecutor();
     private Map<ColumnFamilyStore, Integer> estimatedCompactions = new NonBlockingHashMap<ColumnFamilyStore, Integer>();
 
+    /**
+     * @return A lock, for which acquisition means no compactions can run.
+     */
     public Lock getCompactionLock()
     {
-        return compactionLock;
+        return compactionLock.writeLock();
     }
 
     /**
@@ -99,7 +113,7 @@ public class CompactionManager implement
         {
             public Integer call() throws IOException
             {
-                compactionLock.lock();
+                compactionLock.readLock().lock();
                 try
                 {
                     if (cfs.isInvalid())
@@ -115,26 +129,32 @@ public class CompactionManager implement
                     logger.debug("Checking to see if compaction of " + cfs.columnFamily + " would be useful");
                     Set<List<SSTableReader>> buckets = getBuckets(convertSSTablesToPairs(cfs.getSSTables()), 50L * 1024L * 1024L);
                     updateEstimateFor(cfs, buckets);
+                    int gcBefore = cfs.isIndex() ? Integer.MAX_VALUE : getDefaultGcBefore(cfs);
                     
                     for (List<SSTableReader> sstables : buckets)
                     {
-                        if (sstables.size() >= minThreshold)
+                        if (sstables.size() < minThreshold)
+                            continue;
+                        // if we have too many to compact all at once, compact older ones first -- this avoids
+                        // re-compacting files we just created.
+                        Collections.sort(sstables);
+                        Collection<SSTableReader> tocompact = cfs.getDataTracker().markCompacting(sstables, minThreshold, maxThreshold);
+                        if (tocompact == null)
+                            // enough threads are busy in this bucket
+                            continue;
+                        try
+                        {
+                            return doCompaction(cfs, tocompact, gcBefore);
+                        }
+                        finally
                         {
-                            // if we have too many to compact all at once, compact older ones first -- this avoids
-                            // re-compacting files we just created.
-                            Collections.sort(sstables);
-                            int gcBefore = cfs.isIndex()
-                                         ? Integer.MAX_VALUE
-                                         : getDefaultGcBefore(cfs);
-                            return doCompaction(cfs,
-                                                sstables.subList(0, Math.min(sstables.size(), maxThreshold)),
-                                                gcBefore);
+                            cfs.getDataTracker().unmarkCompacting(tocompact);
                         }
                     }
                 }
                 finally 
                 {
-                    compactionLock.unlock();
+                    compactionLock.readLock().unlock();
                 }
                 return 0;
             }
@@ -171,16 +191,40 @@ public class CompactionManager implement
         {
             public Object call() throws IOException
             {
-                compactionLock.lock();
+                // acquire the write lock to schedule all sstables
+                compactionLock.writeLock().lock();
                 try 
                 {
-                    if (!cfStore.isInvalid())
-                        doCleanupCompaction(cfStore, renewer);
+                    if (cfStore.isInvalid())
+                        return this;
+                    Collection<SSTableReader> tocleanup = cfStore.getDataTracker().markCompacting(cfStore.getSSTables(), 1, Integer.MAX_VALUE);
+                    if (tocleanup == null || tocleanup.isEmpty())
+                        return this;
+                    try
+                    {
+                        // downgrade the lock acquisition
+                        compactionLock.readLock().lock();
+                        compactionLock.writeLock().unlock();
+                        try
+                        {
+                            doCleanupCompaction(cfStore, tocleanup, renewer);
+                        }
+                        finally
+                        {
+                            compactionLock.readLock().unlock();
+                        }
+                    }
+                    finally
+                    {
+                        cfStore.getDataTracker().unmarkCompacting(tocleanup);
+                    }
                     return this;
                 }
                 finally 
                 {
-                    compactionLock.unlock();
+                    // we probably already downgraded
+                    if (compactionLock.writeLock().isHeldByCurrentThread())
+                        compactionLock.writeLock().unlock();
                 }
             }
         };
@@ -193,16 +237,41 @@ public class CompactionManager implement
         {
             public Object call() throws IOException
             {
-                compactionLock.lock();
+                // acquire the write lock to schedule all sstables
+                compactionLock.writeLock().lock();
                 try
                 {
-                    if (!cfStore.isInvalid())
-                        doScrub(cfStore);
+                    if (cfStore.isInvalid())
+                        return this;
+
+                    Collection<SSTableReader> toscrub = cfStore.getDataTracker().markCompacting(cfStore.getSSTables(), 1, Integer.MAX_VALUE);
+                    if (toscrub == null || toscrub.isEmpty())
+                        return this;
+                    try
+                    {
+                        // downgrade the lock acquisition
+                        compactionLock.readLock().lock();
+                        compactionLock.writeLock().unlock();
+                        try
+                        {
+                            doScrub(cfStore, toscrub);
+                        }
+                        finally
+                        {
+                            compactionLock.readLock().unlock();
+                        }
+                    }
+                    finally
+                    {
+                        cfStore.getDataTracker().unmarkCompacting(toscrub);
+                    }
                     return this;
                 }
                 finally
                 {
-                    compactionLock.unlock();
+                    // we probably already downgraded
+                    if (compactionLock.writeLock().isHeldByCurrentThread())
+                        compactionLock.writeLock().unlock();
                 }
             }
         };
@@ -220,7 +289,8 @@ public class CompactionManager implement
         {
             public Object call() throws IOException
             {
-                compactionLock.lock();
+                // acquire the write lock long enough to schedule all sstables
+                compactionLock.writeLock().lock();
                 try
                 {
                     if (cfStore.isInvalid())
@@ -241,13 +311,35 @@ public class CompactionManager implement
                     {
                         sstables = cfStore.getSSTables();
                     }
-    
-                    doCompaction(cfStore, sstables, gcBefore);
+
+                    Collection<SSTableReader> tocompact = cfStore.getDataTracker().markCompacting(sstables, 0, Integer.MAX_VALUE);
+                    if (tocompact == null || tocompact.isEmpty())
+                        return this;
+                    try
+                    {
+                        // downgrade the lock acquisition
+                        compactionLock.readLock().lock();
+                        compactionLock.writeLock().unlock();
+                        try
+                        {
+                            doCompaction(cfStore, tocompact, gcBefore);
+                        }
+                        finally
+                        {
+                            compactionLock.readLock().unlock();
+                        }
+                    }
+                    finally
+                    {
+                        cfStore.getDataTracker().unmarkCompacting(tocompact);
+                    }
                     return this;
                 }
-                finally 
+                finally
                 {
-                    compactionLock.unlock();
+                    // we probably already downgraded
+                    if (compactionLock.writeLock().isHeldByCurrentThread())
+                        compactionLock.writeLock().unlock();
                 }
             }
         };
@@ -293,7 +385,7 @@ public class CompactionManager implement
         {
             public Object call() throws IOException
             {
-                compactionLock.lock();
+                compactionLock.readLock().lock();
                 try
                 {
                     if (cfs.isInvalid())
@@ -320,16 +412,29 @@ public class CompactionManager implement
                     {
                         logger.error("No file to compact for user defined compaction");
                     }
+                    // attempt to schedule the set
+                    else if ((sstables = cfs.getDataTracker().markCompacting(sstables, 1, Integer.MAX_VALUE)) != null)
+                    {
+                        // success: perform the compaction
+                        try
+                        {
+                            doCompaction(cfs, sstables, gcBefore);
+                        }
+                        finally
+                        {
+                            cfs.getDataTracker().unmarkCompacting(sstables);
+                        }
+                    }
                     else
                     {
-                        doCompaction(cfs, sstables, gcBefore);
+                        logger.error("SSTables for user defined compaction are already being compacted.");
                     }
 
                     return this;
                 }
                 finally
                 {
-                    compactionLock.unlock();
+                    compactionLock.readLock().unlock();
                 }
             }
         };
@@ -350,13 +455,16 @@ public class CompactionManager implement
         return null;
     }
 
+    /**
+     * Does not mutate data, so is not scheduled.
+     */
     public Future<Object> submitValidation(final ColumnFamilyStore cfStore, final AntiEntropyService.Validator validator)
     {
         Callable<Object> callable = new Callable<Object>()
         {
             public Object call() throws IOException
             {
-                compactionLock.lock();
+                compactionLock.readLock().lock();
                 try
                 {
                     if (!cfStore.isInvalid())
@@ -365,7 +473,7 @@ public class CompactionManager implement
                 }
                 finally
                 {
-                    compactionLock.unlock();
+                    compactionLock.readLock().unlock();
                 }
             }
         };
@@ -397,7 +505,6 @@ public class CompactionManager implement
             table.snapshot("compact-" + cfs.columnFamily);
 
         // sanity check: all sstables must belong to the same cfs
-        logger.info("Compacting [" + StringUtils.join(sstables, ",") + "]");
         for (SSTableReader sstable : sstables)
             assert sstable.descriptor.cfname.equals(cfs.columnFamily);
 
@@ -422,6 +529,8 @@ public class CompactionManager implement
         // so in our single-threaded compaction world this is a valid way of determining if we're compacting
         // all the sstables (that existed when we started)
         boolean major = cfs.isCompleteSSTables(sstables);
+        String type = major ? "Major" : "Minor";
+        logger.info("Compacting {}: {}", type, sstables);
 
         long startTime = System.currentTimeMillis();
         long totalkeysWritten = 0;
@@ -433,12 +542,11 @@ public class CompactionManager implement
 
         SSTableWriter writer;
         CompactionController controller = new CompactionController(cfs, sstables, major, gcBefore, false);
-        CompactionIterator ci = new CompactionIterator(sstables, controller); // retain a handle so we can call close()
+        CompactionIterator ci = new CompactionIterator(type, sstables, controller); // retain a handle so we can call close()
         Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
-        executor.beginCompaction(cfs.columnFamily, ci);
-
         Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
 
+        executor.beginCompaction(ci);
         try
         {
             if (!nni.hasNext())
@@ -473,6 +581,7 @@ public class CompactionManager implement
         finally
         {
             ci.close();
+            executor.finishCompaction(ci);
         }
 
         SSTableReader ssTable = writer.closeAndOpenReader(getMaxDataAge(sstables));
@@ -508,11 +617,11 @@ public class CompactionManager implement
      *
      * @throws IOException
      */
-    private void doScrub(ColumnFamilyStore cfs) throws IOException
+    private void doScrub(ColumnFamilyStore cfs, Collection<SSTableReader> sstables) throws IOException
     {
         assert !cfs.isIndex();
 
-        for (final SSTableReader sstable : cfs.getSSTables())
+        for (final SSTableReader sstable : sstables)
         {
             logger.info("Scrubbing " + sstable);
 
@@ -538,7 +647,7 @@ public class CompactionManager implement
             }
 
             SSTableWriter writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, null);
-            executor.beginCompaction(cfs.columnFamily, new ScrubInfo(dataFile, sstable));
+            executor.beginCompaction(new ScrubInfo(dataFile, sstable));
             int goodRows = 0, badRows = 0, emptyRows = 0;
 
             while (!dataFile.isEOF())
@@ -682,7 +791,7 @@ public class CompactionManager implement
      *
      * @throws IOException
      */
-    private void doCleanupCompaction(ColumnFamilyStore cfs, NodeId.OneShotRenewer renewer) throws IOException
+    private void doCleanupCompaction(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, NodeId.OneShotRenewer renewer) throws IOException
     {
         assert !cfs.isIndex();
         Table table = cfs.table;
@@ -690,19 +799,12 @@ public class CompactionManager implement
         boolean isCommutative = cfs.metadata.getDefaultValidator().isCommutative();
         if (ranges.isEmpty())
         {
-            logger.info("Cleanup cannot be ran before the node join the ring");
+            logger.info("Cleanup cannot run before a node has joined the ring");
             return;
         }
 
-        for (SSTableReader sstable : cfs.getSSTables())
+        for (SSTableReader sstable : sstables)
         {
-            logger.info("Cleaning up " + sstable);
-            // Calculate the expected compacted filesize
-            long expectedRangeFileSize = cfs.getExpectedCompactedFileSize(Arrays.asList(sstable)) / 2;
-            String compactionFileLocation = table.getDataFileLocation(expectedRangeFileSize);
-            if (compactionFileLocation == null)
-                throw new IOException("disk full");
-
             long startTime = System.currentTimeMillis();
             long totalkeysWritten = 0;
 
@@ -712,23 +814,31 @@ public class CompactionManager implement
               logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
 
             SSTableWriter writer = null;
-            SSTableScanner scanner = sstable.getDirectScanner(CompactionIterator.FILE_BUFFER_SIZE);
-            SortedSet<ByteBuffer> indexedColumns = cfs.getIndexedColumns();
-            executor.beginCompaction(cfs.columnFamily, new CleanupInfo(sstable, scanner));
             try
             {
-                while (scanner.hasNext())
+                logger.info("Cleaning up " + sstable);
+                // Calculate the expected compacted filesize
+                long expectedRangeFileSize = cfs.getExpectedCompactedFileSize(Arrays.asList(sstable)) / 2;
+                String compactionFileLocation = table.getDataFileLocation(expectedRangeFileSize);
+                if (compactionFileLocation == null)
+                    throw new IOException("disk full");
+
+                SSTableScanner scanner = sstable.getDirectScanner(CompactionIterator.FILE_BUFFER_SIZE);
+                SortedSet<ByteBuffer> indexedColumns = cfs.getIndexedColumns();
+                CleanupInfo ci = new CleanupInfo(sstable, scanner);
+                executor.beginCompaction(ci);
+                try
                 {
-                    SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
-                    if (Range.isTokenInRanges(row.getKey().token, ranges))
+                    while (scanner.hasNext())
                     {
-                        writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, writer);
-                        writer.append(getCompactedRow(row, sstable.descriptor, false));
-                        totalkeysWritten++;
-                    }
-                    else
-                    {
-                        if (!indexedColumns.isEmpty() || isCommutative)
+                        SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+                        if (Range.isTokenInRanges(row.getKey().token, ranges))
+                        {
+                            writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, writer);
+                            writer.append(getCompactedRow(row, sstable.descriptor, false));
+                            totalkeysWritten++;
+                        }
+                        else if (!indexedColumns.isEmpty() || isCommutative)
                         {
                             while (row.hasNext())
                             {
@@ -741,10 +851,15 @@ public class CompactionManager implement
                         }
                     }
                 }
+                finally
+                {
+                    scanner.close();
+                    executor.finishCompaction(ci);
+                }
             }
             finally
             {
-                scanner.close();
+                cfs.getDataTracker().unmarkCompacting(Arrays.asList(sstable));
             }
 
             List<SSTableReader> results = new ArrayList<SSTableReader>();
@@ -828,7 +943,7 @@ public class CompactionManager implement
         }
 
         CompactionIterator ci = new ValidationCompactionIterator(cfs, validator.request.range);
-        executor.beginCompaction(cfs.columnFamily, ci);
+        executor.beginCompaction(ci);
         try
         {
             Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
@@ -845,6 +960,7 @@ public class CompactionManager implement
         finally
         {
             ci.close();
+            executor.finishCompaction(ci);
         }
     }
 
@@ -912,23 +1028,33 @@ public class CompactionManager implement
         return tablePairs;
     }
     
+    /**
+     * Is not scheduled, because it is performing disjoint work from sstable compaction.
+     */
     public Future submitIndexBuild(final ColumnFamilyStore cfs, final Table.IndexBuilder builder)
     {
         Runnable runnable = new Runnable()
         {
             public void run()
             {
-                compactionLock.lock();
+                compactionLock.readLock().lock();
                 try
                 {
                     if (cfs.isInvalid())
                         return;
-                    executor.beginCompaction(cfs.columnFamily, builder);
-                    builder.build();
+                    executor.beginCompaction(builder);
+                    try
+                    {
+                        builder.build();
+                    }
+                    finally
+                    {
+                        executor.finishCompaction(builder);
+                    }
                 }
                 finally
                 {
-                    compactionLock.unlock();
+                    compactionLock.readLock().unlock();
                 }
             }
         };
@@ -937,12 +1063,15 @@ public class CompactionManager implement
         // future that will be immediately immediately get()ed and executed. Happens during a migration, which locks
         // the compaction thread and then reinitializes a ColumnFamilyStore. Under normal circumstances, CFS spawns
         // index jobs to the compaction manager (this) and blocks on them.
-        if (compactionLock.isHeldByCurrentThread())
+        if (compactionLock.isWriteLockedByCurrentThread())
             return new SimpleFuture(runnable);
         else
             return executor.submit(runnable);
     }
 
+    /**
+     * Submits an sstable to be rebuilt: is not scheduled, since the sstable must not exist.
+     */
     public Future<SSTableReader> submitSSTableBuild(final Descriptor desc, OperationType type)
     {
         // invalid descriptions due to missing or dropped CFS are handled by SSTW and StreamInSession.
@@ -951,15 +1080,22 @@ public class CompactionManager implement
         {
             public SSTableReader call() throws IOException
             {
-                compactionLock.lock();
+                compactionLock.readLock().lock();
                 try
                 {
-                    executor.beginCompaction(desc.cfname, builder);
-                    return builder.build();
+                    executor.beginCompaction(builder);
+                    try
+                    {
+                        return builder.build();
+                    }
+                    finally
+                    {
+                        executor.finishCompaction(builder);
+                    }
                 }
                 finally
                 {
-                    compactionLock.unlock();
+                    compactionLock.readLock().unlock();
                 }
             }
         };
@@ -972,8 +1108,27 @@ public class CompactionManager implement
         {
             public void runMayThrow() throws IOException
             {
-                executor.beginCompaction(writer.getColumnFamily(), writer);
-                writer.saveCache();
+                if (!AutoSavingCache.flushInProgress.compareAndSet(false, true))
+                {
+                    logger.debug("Cache flushing was already in progress: skipping {}", writer.getCompactionInfo());
+                    return;
+                }
+                try
+                {
+                    executor.beginCompaction(writer);
+                    try
+                    {
+                        writer.saveCache();
+                    }
+                    finally
+                    {
+                        executor.finishCompaction(writer);
+                    }
+                }
+                finally
+                {
+                    AutoSavingCache.flushInProgress.set(false);
+                }
             }
         };
         return executor.submit(runnable);
@@ -988,7 +1143,9 @@ public class CompactionManager implement
     {
         public ValidationCompactionIterator(ColumnFamilyStore cfs, Range range) throws IOException
         {
-            super(getCollatingIterator(cfs.getSSTables(), range), new CompactionController(cfs, cfs.getSSTables(), true, getDefaultGcBefore(cfs), false));
+            super("Validation",
+                  getCollatingIterator(cfs.getSSTables(), range),
+                  new CompactionController(cfs, cfs.getSSTables(), true, getDefaultGcBefore(cfs), false));
         }
 
         protected static CollatingIterator getCollatingIterator(Iterable<SSTableReader> sstables, Range range) throws IOException
@@ -1000,12 +1157,6 @@ public class CompactionManager implement
             }
             return iter;
         }
-
-        @Override
-        public String getTaskType()
-        {
-            return "Validation";
-        }
     }
 
     public void checkAllColumnFamilies() throws IOException
@@ -1035,67 +1186,57 @@ public class CompactionManager implement
 
     private static class CompactionExecutor extends DebuggableThreadPoolExecutor
     {
-        private volatile String columnFamily;
-        private volatile ICompactionInfo ci;
+        // a synchronized identity set of running tasks to their compaction info
+        private final Set<CompactionInfo.Holder> compactions;
 
         public CompactionExecutor()
         {
-            super("CompactionExecutor", DatabaseDescriptor.getCompactionThreadPriority());
-        }
-
-        @Override
-        public void afterExecute(Runnable r, Throwable t)
-        {
-            super.afterExecute(r, t);
-            columnFamily = null;
-            ci = null;
+            super(getThreadCount(),
+                  60,
+                  TimeUnit.SECONDS,
+                  new LinkedBlockingQueue<Runnable>(),
+                  new NamedThreadFactory("CompactionExecutor", DatabaseDescriptor.getCompactionThreadPriority()));
+            Map<CompactionInfo.Holder, Boolean> cmap = new IdentityHashMap<CompactionInfo.Holder, Boolean>();
+            compactions = Collections.synchronizedSet(Collections.newSetFromMap(cmap));
         }
 
-        void beginCompaction(String columnFamily, ICompactionInfo ci)
+        private static int getThreadCount()
         {
-            this.columnFamily = columnFamily;
-            this.ci = ci;
+            if (!DatabaseDescriptor.getCompactionMultithreading())
+                return 1;
+            return Math.max(2, Runtime.getRuntime().availableProcessors());
         }
 
-        public String getColumnFamilyName()
+        void beginCompaction(CompactionInfo.Holder ci)
         {
-            return columnFamily == null ? null : columnFamily;
+            compactions.add(ci);
         }
 
-        public Long getBytesTotal()
+        void finishCompaction(CompactionInfo.Holder ci)
         {
-            return ci == null ? null : ci.getTotalBytes();
+            compactions.remove(ci);
         }
 
-        public Long getBytesCompleted()
+        public List<CompactionInfo.Holder> getCompactions()
         {
-            return ci == null ? null : ci.getBytesComplete();
+            return new ArrayList<CompactionInfo.Holder>(compactions);
         }
-
-        public String getType()
-        {
-            return ci == null ? null : ci.getTaskType();
-        }
-    }
-
-    public String getColumnFamilyInProgress()
-    {
-        return executor.getColumnFamilyName();
-    }
-
-    public Long getBytesTotalInProgress()
-    {
-        return executor.getBytesTotal();
     }
 
-    public Long getBytesCompacted()
+    public List<CompactionInfo> getCompactions()
     {
-        return executor.getBytesCompleted();
+        List<CompactionInfo> out = new ArrayList<CompactionInfo>();
+        for (CompactionInfo.Holder ci : executor.getCompactions())
+            out.add(ci.getCompactionInfo());
+        return out;
     }
 
-    public String getCompactionType()
+    public List<String> getCompactionSummary()
     {
-        return executor.getType();
+        List<String> out = new ArrayList<String>();
+        for (CompactionInfo.Holder ci : executor.getCompactions())
+            out.add(ci.getCompactionInfo().toString());
+        return out;
     }
 
     public int getPendingTasks()
@@ -1182,64 +1323,57 @@ public class CompactionManager implement
         }
     }
 
-    private static class CleanupInfo implements ICompactionInfo
+    private static class CleanupInfo implements CompactionInfo.Holder
     {
         private final SSTableReader sstable;
         private final SSTableScanner scanner;
-
         public CleanupInfo(SSTableReader sstable, SSTableScanner scanner)
         {
             this.sstable = sstable;
             this.scanner = scanner;
         }
 
-        public long getTotalBytes()
-        {
-            return scanner.getFileLength();
-        }
-
-        public long getBytesComplete()
-        {
-            return scanner.getFilePointer();
-        }
-
-        public String getTaskType()
+        public CompactionInfo getCompactionInfo()
         {
-            return "Cleanup of " + sstable.getColumnFamilyName();
+            try
+            {
+                return new CompactionInfo(sstable.descriptor.ksname,
+                                          sstable.descriptor.cfname,
+                                          "Cleanup of " + sstable.getColumnFamilyName(),
+                                          scanner.getFilePointer(),
+                                          scanner.getFileLength());
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException();
+            }
         }
     }
 
-    private static class ScrubInfo implements ICompactionInfo
+    private static class ScrubInfo implements CompactionInfo.Holder
     {
         private final BufferedRandomAccessFile dataFile;
         private final SSTableReader sstable;
-
         public ScrubInfo(BufferedRandomAccessFile dataFile, SSTableReader sstable)
         {
             this.dataFile = dataFile;
             this.sstable = sstable;
         }
 
-        public long getTotalBytes()
+        public CompactionInfo getCompactionInfo()
         {
             try
             {
-                return dataFile.length();
+                return new CompactionInfo(sstable.descriptor.ksname,
+                                          sstable.descriptor.cfname,
+                                          "Scrub " + sstable,
+                                          dataFile.getFilePointer(),
+                                          dataFile.length());
             }
-            catch (IOException e)
+            catch (Exception e)
             {
-                throw new RuntimeException(e);
+                throw new RuntimeException();
             }
         }
-
-        public long getBytesComplete()
-        {
-            return dataFile.getFilePointer();
-        }
-
-        public String getTaskType()
-        {
-            return "Scrub " + sstable;
-        }
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManagerMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManagerMBean.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManagerMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManagerMBean.java Mon Apr 11 08:36:55 2011
@@ -18,28 +18,17 @@
 
 package org.apache.cassandra.db;
 
-public interface CompactionManagerMBean
-{    
-
-    /**
-     * @return the columnfamily currently being compacted; null if none
-     */
-    public String getColumnFamilyInProgress();
+import java.util.List;
 
-    /**
-     * @return the total (data, not including index and filter) bytes being compacted; null if none
-     */
-    public Long getBytesTotalInProgress();
+import org.apache.cassandra.io.CompactionInfo;
 
-    /**
-     * @return the progress on the current compaction; null if none
-     */
-    public Long getBytesCompacted();
+public interface CompactionManagerMBean
+{
+    /** List of running compaction objects. */
+    public List<CompactionInfo> getCompactions();
 
-    /**
-     * @return the type of compaction operation currently in progress; null if none
-     */
-    public String getCompactionType();
+    /** List of running compaction summary strings. */
+    public List<String> getCompactionSummary();
 
     /**
      * @return estimated number of compactions remaining to perform

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Mon Apr 11 08:36:55 2011
@@ -54,7 +54,8 @@ public class DataTracker
     public DataTracker(ColumnFamilyStore cfstore)
     {
         this.cfstore = cfstore;
-        this.view = new AtomicReference<View>(new View(new Memtable(cfstore), Collections.<Memtable>emptySet(), Collections.<SSTableReader>emptySet()));
+        this.view = new AtomicReference<View>();
+        this.init();
     }
 
     public Memtable getMemtable()
@@ -154,6 +155,57 @@ public class DataTracker
         }
     }
 
+    /**
+     * @return A subset of the given active sstables that have been marked compacting,
+     * or null if the thresholds cannot be met: files that are marked compacting must
+     * later be unmarked using unmarkCompacting.
+     */
+    public Set<SSTableReader> markCompacting(Collection<SSTableReader> tomark, int min, int max)
+    {
+        if (max < min || max < 1)
+            return null;
+        View currentView, newView;
+        Set<SSTableReader> subset = null;
+        // order preserving set copy of the input
+        Set<SSTableReader> remaining = new LinkedHashSet<SSTableReader>(tomark);
+        do
+        {
+            currentView = view.get();
+
+            // find the subset that is active and not already compacting
+            remaining.removeAll(currentView.compacting);
+            remaining.retainAll(currentView.sstables);
+            if (remaining.size() < min)
+                // cannot meet the min threshold
+                return null;
+
+            // cap the newly compacting items into a subset set
+            subset = new HashSet<SSTableReader>();
+            Iterator<SSTableReader> iter = remaining.iterator();
+            for (int added = 0; added < max && iter.hasNext(); added++)
+                subset.add(iter.next());
+
+            newView = currentView.markCompacting(subset);
+        }
+        while (!view.compareAndSet(currentView, newView));
+        return subset;
+    }
+
+    /**
+     * Removes files from compacting status: this is different from 'markCompacted'
+     * because it should be run regardless of whether a compaction succeeded.
+     */
+    public void unmarkCompacting(Collection<SSTableReader> unmark)
+    {
+        View currentView, newView;
+        do
+        {
+            currentView = view.get();
+            newView = currentView.unmarkCompacting(unmark);
+        }
+        while (!view.compareAndSet(currentView, newView));
+    }
+
     public void markCompacted(Collection<SSTableReader> sstables)
     {
         replace(sstables, Collections.<SSTableReader>emptyList());
@@ -180,9 +232,13 @@ public class DataTracker
         replace(getSSTables(), Collections.<SSTableReader>emptyList());
     }
 
-    public void clearUnsafe()
+    /** (Re)initializes the tracker, purging all references. */
+    void init()
     {
-        view.set(new View(new Memtable(cfstore), Collections.<Memtable>emptySet(), Collections.<SSTableReader>emptySet()));
+        view.set(new View(new Memtable(cfstore),
+                          Collections.<Memtable>emptySet(),
+                          Collections.<SSTableReader>emptySet(),
+                          Collections.<SSTableReader>emptySet()));
     }
 
     private void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
@@ -384,31 +440,34 @@ public class DataTracker
 
     /**
      * An immutable structure holding the current memtable, the memtables pending
-     * flush and the sstables for a column family.
+     * flush, the sstables for a column family, and the sstables that are active
+     * in compaction (a subset of the sstables).
      */
     static class View
     {
         public final Memtable memtable;
         public final Set<Memtable> memtablesPendingFlush;
         public final Set<SSTableReader> sstables;
+        public final Set<SSTableReader> compacting;
 
-        public View(Memtable memtable, Set<Memtable> pendingFlush, Set<SSTableReader> sstables)
+        public View(Memtable memtable, Set<Memtable> pendingFlush, Set<SSTableReader> sstables, Set<SSTableReader> compacting)
         {
             this.memtable = memtable;
             this.memtablesPendingFlush = Collections.unmodifiableSet(pendingFlush);
             this.sstables = Collections.unmodifiableSet(sstables);
+            this.compacting = Collections.unmodifiableSet(compacting);
         }
 
         public View switchMemtable(Memtable newMemtable)
         {
             Set<Memtable> newPending = new HashSet<Memtable>(memtablesPendingFlush);
             newPending.add(memtable);
-            return new View(newMemtable, newPending, sstables);
+            return new View(newMemtable, newPending, sstables, compacting);
         }
 
         public View renewMemtable(Memtable newMemtable)
         {
-            return new View(newMemtable, memtablesPendingFlush, sstables);
+            return new View(newMemtable, memtablesPendingFlush, sstables, compacting);
         }
 
         public View replaceFlushed(Memtable flushedMemtable, SSTableReader newSSTable)
@@ -417,7 +476,7 @@ public class DataTracker
             Set<SSTableReader> newSSTables = new HashSet<SSTableReader>(sstables);
             newPendings.remove(flushedMemtable);
             newSSTables.add(newSSTable);
-            return new View(memtable, newPendings, newSSTables);
+            return new View(memtable, newPendings, newSSTables, compacting);
         }
 
         public View replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
@@ -425,7 +484,21 @@ public class DataTracker
             Set<SSTableReader> sstablesNew = new HashSet<SSTableReader>(sstables);
             Iterables.addAll(sstablesNew, replacements);
             sstablesNew.removeAll(oldSSTables);
-            return new View(memtable, memtablesPendingFlush, sstablesNew);
+            return new View(memtable, memtablesPendingFlush, sstablesNew, compacting);
+        }
+
+        public View markCompacting(Collection<SSTableReader> tomark)
+        {
+            Set<SSTableReader> compactingNew = new HashSet<SSTableReader>(compacting);
+            compactingNew.addAll(tomark);
+            return new View(memtable, memtablesPendingFlush, sstables, compactingNew);
+        }
+
+        public View unmarkCompacting(Collection<SSTableReader> tounmark)
+        {
+            Set<SSTableReader> compactingNew = new HashSet<SSTableReader>(compacting);
+            compactingNew.removeAll(tounmark);
+            return new View(memtable, memtablesPendingFlush, sstables, compactingNew);
         }
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Mon Apr 11 08:36:55 2011
@@ -39,7 +39,7 @@ import org.apache.cassandra.db.commitlog
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.dht.LocalToken;
-import org.apache.cassandra.io.ICompactionInfo;
+import org.apache.cassandra.io.CompactionInfo;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.SSTableDeletingReference;
 import org.apache.cassandra.io.sstable.SSTableReader;
@@ -597,7 +597,7 @@ public class Table
         return replicationStrategy;
     }
 
-    public class IndexBuilder implements ICompactionInfo
+    public class IndexBuilder implements CompactionInfo.Holder
     {
         private final ColumnFamilyStore cfs;
         private final SortedSet<ByteBuffer> columns;
@@ -610,6 +610,15 @@ public class Table
             this.iter = iter;
         }
 
+        public CompactionInfo getCompactionInfo()
+        {
+            return new CompactionInfo(cfs.table.name,
+                                      cfs.columnFamily,
+                                      String.format("Secondary index build %s", cfs.columnFamily),
+                                      iter.getTotalBytes(),
+                                      iter.getBytesRead());
+        }
+
         public void build()
         {
             while (iter.hasNext())
@@ -646,21 +655,6 @@ public class Table
                 throw new RuntimeException(e);
             }
         }
-
-        public long getTotalBytes()
-        {
-            return iter.getTotalBytes();
-        }
-
-        public long getBytesComplete()
-        {
-            return iter.getBytesRead();
-        }
-
-        public String getTaskType()
-        {
-            return String.format("Secondary index build %s", cfs.columnFamily);
-        }
     }
 
     private Object indexLockFor(ByteBuffer key)

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/CompactionController.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionController.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionController.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionController.java Mon Apr 11 08:36:55 2011
@@ -61,6 +61,18 @@ public class CompactionController
         return forceDeserialize ? basicDeserializingController : basicController;
     }
 
+    /** @return The keyspace name: only valid if created with a non-null CFS. */
+    public String getKeyspace()
+    {
+        return cfs != null ? cfs.table.name : "n/a";
+    }
+
+    /** @return The column family name: only valid if created with a non-null CFS. */
+    public String getColumnFamily()
+    {
+        return cfs != null ? cfs.columnFamily : "n/a";
+    }
+
     public boolean shouldPurge(DecoratedKey key)
     {
         return isMajor || (cfs != null && !cfs.isKeyInRemainingSSTables(key, sstables));

Added: cassandra/trunk/src/java/org/apache/cassandra/io/CompactionInfo.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionInfo.java?rev=1090978&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionInfo.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionInfo.java Mon Apr 11 08:36:55 2011
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.io;
+
+import java.io.Serializable;
+
+/** Implements serializable to allow structured info to be returned via JMX. */
+public final class CompactionInfo implements Serializable
+{
+    private final String ksname;
+    private final String cfname;
+    private final String tasktype;
+    private final long bytesComplete;
+    private final long totalBytes;
+
+    public CompactionInfo(String ksname, String cfname, String tasktype, long bytesComplete, long totalBytes)
+    {
+        this.ksname = ksname;
+        this.cfname = cfname;
+        this.tasktype = tasktype;
+        this.bytesComplete = bytesComplete;
+        this.totalBytes = totalBytes;
+    }
+
+    /** @return A copy of this CompactionInfo with updated progress. */
+    public CompactionInfo forProgress(long bytesComplete, long totalBytes)
+    {
+        return new CompactionInfo(ksname, cfname, tasktype, bytesComplete, totalBytes);
+    }
+
+    public String getKeyspace()
+    {
+        return ksname;
+    }
+
+    public String getColumnFamily()
+    {
+        return cfname;
+    }
+
+    public long getBytesComplete()
+    {
+        return bytesComplete;
+    }
+
+    public long getTotalBytes()
+    {
+        return totalBytes;
+    }
+
+    public String getTaskType()
+    {
+        return tasktype;
+    }
+
+    public String toString()
+    {
+        StringBuilder buff = new StringBuilder();
+        buff.append(getTaskType()).append('@').append(hashCode());
+        buff.append('(').append(getKeyspace()).append(", ").append(getColumnFamily());
+        buff.append(", ").append(getBytesComplete()).append('/').append(getTotalBytes());
+        return buff.append(')').toString();
+    }
+
+    public interface Holder
+    {
+        public CompactionInfo getCompactionInfo();
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java Mon Apr 11 08:36:55 2011
@@ -45,28 +45,30 @@ import org.apache.cassandra.utils.FBUtil
 import org.apache.cassandra.utils.ReducingIterator;
 
 public class CompactionIterator extends ReducingIterator<SSTableIdentityIterator, AbstractCompactedRow>
-implements Closeable, ICompactionInfo
+implements Closeable, CompactionInfo.Holder
 {
     private static Logger logger = LoggerFactory.getLogger(CompactionIterator.class);
 
     public static final int FILE_BUFFER_SIZE = 1024 * 1024;
 
     protected final List<SSTableIdentityIterator> rows = new ArrayList<SSTableIdentityIterator>();
+    protected final String type;
     protected final CompactionController controller;
 
     private long totalBytes;
     private long bytesRead;
     private long row;
 
-    public CompactionIterator(Iterable<SSTableReader> sstables, CompactionController controller) throws IOException
+    public CompactionIterator(String type, Iterable<SSTableReader> sstables, CompactionController controller) throws IOException
     {
-        this(getCollatingIterator(sstables), controller);
+        this(type, getCollatingIterator(sstables), controller);
     }
 
     @SuppressWarnings("unchecked")
-    protected CompactionIterator(Iterator iter, CompactionController controller)
+    protected CompactionIterator(String type, Iterator iter, CompactionController controller)
     {
         super(iter);
+        this.type = type;
         this.controller = controller;
         row = 0;
         totalBytes = bytesRead = 0;
@@ -88,6 +90,15 @@ implements Closeable, ICompactionInfo
         return iter;
     }
 
+    public CompactionInfo getCompactionInfo()
+    {
+        return new CompactionInfo(controller.getKeyspace(),
+                                  controller.getColumnFamily(),
+                                  type,
+                                  bytesRead,
+                                  totalBytes);
+    }
+
     @Override
     protected boolean isEqual(SSTableIdentityIterator o1, SSTableIdentityIterator o2)
     {
@@ -160,18 +171,8 @@ implements Closeable, ICompactionInfo
         return ((CollatingIterator)source).getIterators();
     }
 
-    public long getTotalBytes()
-    {
-        return totalBytes;
-    }
-
-    public long getBytesComplete()
-    {
-        return bytesRead;
-    }
-
-    public String getTaskType()
+    public String toString()
     {
-        return controller.isMajor ? "Major" : "Minor";
+        return this.getCompactionInfo().toString();
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Mon Apr 11 08:36:55 2011
@@ -42,7 +42,7 @@ import org.apache.cassandra.db.marshal.A
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.AbstractCompactedRow;
 import org.apache.cassandra.io.CompactionController;
-import org.apache.cassandra.io.ICompactionInfo;
+import org.apache.cassandra.io.CompactionInfo;
 import org.apache.cassandra.io.LazilyCompactedRow;
 import org.apache.cassandra.io.PrecompactedRow;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
@@ -249,7 +249,7 @@ public class SSTableWriter extends SSTab
      * Removes the given SSTable from temporary status and opens it, rebuilding the
      * bloom filter and row index from the data file.
      */
-    public static class Builder implements ICompactionInfo
+    public static class Builder implements CompactionInfo.Holder
     {
         private final Descriptor desc;
         private final OperationType type;
@@ -263,6 +263,24 @@ public class SSTableWriter extends SSTab
             cfs = Table.open(desc.ksname).getColumnFamilyStore(desc.cfname);
         }
 
+        public CompactionInfo getCompactionInfo()
+        {
+            maybeOpenIndexer();
+            try
+            {
+                // both file offsets are still valid post-close
+                return new CompactionInfo(desc.ksname,
+                                          desc.cfname,
+                                          "SSTable rebuild",
+                                          indexer.dfile.getFilePointer(),
+                                          indexer.dfile.length());
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+        }
+
         // lazy-initialize the file to avoid opening it until it's actually executing on the CompactionManager,
         // since the 8MB buffers can use up heap quickly
         private void maybeOpenIndexer()
@@ -301,32 +319,6 @@ public class SSTableWriter extends SSTab
             logger.debug("estimated row count was {} of real count", ((double)estimatedRows) / rows);
             return SSTableReader.open(rename(desc, SSTable.componentsFor(desc)));
         }
-
-        public long getTotalBytes()
-        {
-            maybeOpenIndexer();
-            try
-            {
-                // (length is still valid post-close)
-                return indexer.dfile.length();
-            }
-            catch (IOException e)
-            {
-                throw new IOError(e);
-            }
-        }
-
-        public long getBytesComplete()
-        {
-            maybeOpenIndexer();
-            // (getFilePointer is still valid post-close)
-            return indexer.dfile.getFilePointer();
-        }
-
-        public String getTaskType()
-        {
-            return "SSTable rebuild";
-        }
     }
 
     static class RowIndexer

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java Mon Apr 11 08:36:55 2011
@@ -40,6 +40,7 @@ import org.apache.cassandra.concurrent.I
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
 import org.apache.cassandra.db.CompactionManagerMBean;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.CompactionInfo;
 import org.apache.cassandra.net.MessagingServiceMBean;
 import org.apache.cassandra.utils.EstimatedHistogram;
 
@@ -324,10 +325,15 @@ public class NodeCmd
     public void printCompactionStats(PrintStream outs)
     {
         CompactionManagerMBean cm = probe.getCompactionManagerProxy();
-        outs.println("compaction type: " + (cm.getCompactionType() == null ? "n/a" : cm.getCompactionType()));
-        outs.println("column family: " + (cm.getColumnFamilyInProgress() == null ? "n/a" : cm.getColumnFamilyInProgress()));
-        outs.println("bytes compacted: " + (cm.getBytesCompacted() == null ? "n/a" : cm.getBytesCompacted()));
-        outs.println("bytes total in progress: " + (cm.getBytesTotalInProgress() == null ? "n/a" : cm.getBytesTotalInProgress() ));
+        for (CompactionInfo c : cm.getCompactions())
+        {
+            outs.println("compaction type: " + c.getTaskType());
+            outs.println("keyspace: " + c.getKeyspace());
+            outs.println("column family: " + c.getColumnFamily());
+            outs.println("bytes compacted: " + c.getBytesComplete());
+            outs.println("bytes total: " + c.getTotalBytes());
+            outs.println("-----------------");
+        }
         outs.println("pending tasks: " + cm.getPendingTasks());
     }
  

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java Mon Apr 11 08:36:55 2011
@@ -46,18 +46,23 @@ public class CompactionsTest extends Cle
     public static final String TABLE2 = "Keyspace2";
     public static final InetAddress LOCAL = FBUtilities.getLocalAddress();
 
+    public static final int MIN_COMPACTION_THRESHOLD = 2;
+
     @Test
     public void testCompactions() throws IOException, ExecutionException, InterruptedException
     {
-        CompactionManager.instance.disableAutoCompaction();
-
         // this test does enough rows to force multiple block indexes to be used
         Table table = Table.open(TABLE1);
         ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
 
         final int ROWS_PER_SSTABLE = 10;
+        final int SSTABLES = (DatabaseDescriptor.getIndexInterval() * 3 / ROWS_PER_SSTABLE);
+
+        // disable compaction while flushing
+        store.disableAutoCompaction();
+
         Set<DecoratedKey> inserted = new HashSet<DecoratedKey>();
-        for (int j = 0; j < (DatabaseDescriptor.getIndexInterval() * 3) / ROWS_PER_SSTABLE; j++) {
+        for (int j = 0; j < SSTABLES; j++) {
             for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
                 DecoratedKey key = Util.dk(String.valueOf(i % 2));
                 RowMutation rm = new RowMutation(TABLE1, key.key);
@@ -68,10 +73,22 @@ public class CompactionsTest extends Cle
             store.forceBlockingFlush();
             assertEquals(inserted.toString(), inserted.size(), Util.getRangeSlice(store).size());
         }
+        // re-enable compaction with thresholds low enough to force a few rounds
+        store.setMinimumCompactionThreshold(2);
+        store.setMaximumCompactionThreshold(4);
+        // loop submitting parallel compactions until they all return 0
         while (true)
         {
-            Future<Integer> ft = CompactionManager.instance.submitMinorIfNeeded(store);
-            if (ft.get() == 0)
+            ArrayList<Future<Integer>> compactions = new ArrayList<Future<Integer>>();
+            for (int i = 0; i < 10; i++)
+                compactions.add(CompactionManager.instance.submitMinorIfNeeded(store));
+            // another compaction attempt will be launched in the background by
+            // each completing compaction: not much we can do to control them here
+            boolean progress = false;
+            for (Future<Integer> compaction : compactions)
+               if (compaction.get() > 0)
+                   progress = true;
+            if (!progress)
                 break;
         }
         if (store.getSSTables().size() > 1)
@@ -136,5 +153,4 @@ public class CompactionsTest extends Cle
         buckets = CompactionManager.getBuckets(pairs, 10); // notice the min is 10
         assertEquals(1, buckets.size());
     }
-
 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java?rev=1090978&r1=1090977&r2=1090978&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java Mon Apr 11 08:36:55 2011
@@ -309,7 +309,7 @@ public class LazilyCompactedRowTest exte
     {
         public LazyCompactionIterator(Iterable<SSTableReader> sstables, CompactionController controller) throws IOException
         {
-            super(sstables, controller);
+            super("Lazy", sstables, controller);
         }
 
         @Override
@@ -323,7 +323,7 @@ public class LazilyCompactedRowTest exte
     {
         public PreCompactingIterator(Iterable<SSTableReader> sstables, CompactionController controller) throws IOException
         {
-            super(sstables, controller);
+            super("Pre", sstables, controller);
         }
 
         @Override