You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/09/07 18:31:53 UTC

svn commit: r1166255 - in /cassandra/trunk: ./ conf/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/columniterator/ src/java/org/apache/cassandra/db/compaction/ src/java/org/apache/cassandra/io/...

Author: jbellis
Date: Wed Sep  7 16:31:52 2011
New Revision: 1166255

URL: http://svn.apache.org/viewvc?rev=1166255&view=rev
Log:
parallel compaction
patch by jbellis; reviewed by slebresne for CASSANDRA-2901

Added:
    cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/ICountableColumnIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/NEWS.txt
    cassandra/trunk/conf/cassandra.yaml
    cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/db/EchoedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Sep  7 16:31:52 2011
@@ -58,6 +58,9 @@
  * expose the ability to repair the first range (as returned by the
    partitioner) of a node (CASSANDRA-2606)
  * Streams Compression (CASSANDRA-3015)
+ * add ability to use multiple threads during a single compaction
+   (CASSANDRA-2901)
+
 
 0.8.5
  * fix NPE when encryption_options is unspecified (CASSANDRA-3007)

Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Wed Sep  7 16:31:52 2011
@@ -23,6 +23,8 @@ Features
       Columnfamily compaction_strategy=LeveledCompactionStrategy option.  
       Leveled compaction means you only need to keep a few MB of space free for 
       compaction instead of (in the worst case) 50%.
+    - Ability to use multiple threads during a single compaction.  See 
+      multithreaded_compaction in cassandra.yaml for more details.
     - Windows Service ("cassandra.bat install" to enable)
     - Hinted Handoff has two major improvements:
         - Hint replay is much more efficient thanks to a change in the data model

Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Wed Sep  7 16:31:52 2011
@@ -268,17 +268,27 @@ column_index_size_in_kb: 64
 in_memory_compaction_limit_in_mb: 64
 
 # Number of simultaneous compactions to allow, NOT including
-# validation "compactions" for anti-entropy repair. This defaults to
-# the number of cores. This can help preserve read performance in a
-# mixed read/write workload, by mitigating the tendency of small
-# sstables to accumulate during a single long running compactions. The
-# default is usually fine and if you experience problems with
-# compaction running too slowly or too fast, you should look at
+# validation "compactions" for anti-entropy repair.  Simultaneous
+# compactions can help preserve read performance in a mixed read/write
+# workload, by mitigating the tendency of small sstables to accumulate
+# during a single long running compactions. The default is usually
+# fine and if you experience problems with compaction running too
+# slowly or too fast, you should look at
 # compaction_throughput_mb_per_sec first.
 #
-# Uncomment to make compaction mono-threaded.
+# This setting has no effect on LeveledCompactionStrategy.
+#
+# concurrent_compactors defaults to the number of cores.
+# Uncomment to make compaction mono-threaded, the pre-0.8 default.
 #concurrent_compactors: 1
 
+# Multi-threaded compaction. When enabled, each compaction will use
+# up to one thread per core, plus one thread per sstable being merged.
+# This is usually only useful for SSD-based hardware: otherwise, 
+# your concern is usually to get compaction to do LESS i/o (see:
+# compaction_throughput_mb_per_sec), not more.
+multithreaded_compaction: false
+
 # Throttles compaction to the given total throughput across the entire
 # system. The faster you insert data, the faster you need to compact in
 # order to keep the sstable count down, but in general, setting this to

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Wed Sep  7 16:31:52 2011
@@ -83,9 +83,10 @@ public class Config
     public Integer in_memory_compaction_limit_in_mb = 256;
     public Integer concurrent_compactors = Runtime.getRuntime().availableProcessors();
     public Integer compaction_throughput_mb_per_sec = 16;
+    public Boolean multithreaded_compaction = false;
 
     public Integer stream_throughput_outbound_megabits_per_sec;
-    
+
     public String[] data_file_directories;
 
     public String saved_caches_directory;

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Sep  7 16:31:52 2011
@@ -674,6 +674,11 @@ public class DatabaseDescriptor
         return conf.concurrent_compactors;
     }
 
+    public static boolean isMultithreadedCompaction()
+    {
+        return conf.multithreaded_compaction;
+    }
+
     public static int getCompactionThroughputMbPerSec()
     {
         return conf.compaction_throughput_mb_per_sec;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/EchoedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/EchoedRow.java?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/EchoedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/EchoedRow.java Wed Sep  7 16:31:52 2011
@@ -68,7 +68,7 @@ public class EchoedRow extends AbstractC
 
     public int columnCount()
     {
-        return row.columnCount;
+        return row.getColumnCount();
     }
 
     public long maxTimestamp()

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java Wed Sep  7 16:31:52 2011
@@ -32,7 +32,8 @@ import org.apache.cassandra.utils.Closea
 public interface IColumnIterator extends CloseableIterator<IColumn>
 {
     /**
-     * @return An empty CF holding metadata for the row being iterated.
+     * @return A ColumnFamily holding metadata for the row being iterated.
+     * Do not modify this CF. Whether it is empty or not is implementation-dependent.
      */
     public abstract ColumnFamily getColumnFamily();
 

Added: cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/ICountableColumnIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/ICountableColumnIterator.java?rev=1166255&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/ICountableColumnIterator.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/ICountableColumnIterator.java Wed Sep  7 16:31:52 2011
@@ -0,0 +1,8 @@
+package org.apache.cassandra.db.columniterator;
+
+public interface ICountableColumnIterator extends IColumnIterator
+{
+    public int getColumnCount();
+
+    public void reset();
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java Wed Sep  7 16:31:52 2011
@@ -21,6 +21,7 @@ package org.apache.cassandra.db.compacti
  */
 
 
+import java.io.Closeable;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.security.MessageDigest;
@@ -42,12 +43,19 @@ public abstract class AbstractCompactedR
     }
 
     /**
-     * write the row (size + column index + filter + column data, but NOT row key) to @param out
+     * write the row (size + column index + filter + column data, but NOT row key) to @param out.
+     * It is an error to call this if isEmpty is false.  (Because the key is appended first,
+     * so we'd have an incomplete row written.)
+     *
+     * write() may change internal state; it is NOT valid to call write() or update() a second time.
      */
     public abstract long write(DataOutput out) throws IOException;
 
     /**
-     * update @param digest with the data bytes of the row (not including row key or row size)
+     * update @param digest with the data bytes of the row (not including row key or row size).
+     * May be called even if empty.
+     *
+     * update() may change internal state; it is NOT valid to call write() or update() a second time.
      */
     public abstract void update(MessageDigest digest);
 

Added: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java?rev=1166255&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java Wed Sep  7 16:31:52 2011
@@ -0,0 +1,69 @@
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableScanner;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.Throttle;
+
+public abstract class AbstractCompactionIterable implements Iterable<AbstractCompactedRow>, CompactionInfo.Holder
+{
+    public static final int FILE_BUFFER_SIZE = 1024 * 1024;
+
+    private static Logger logger = LoggerFactory.getLogger(CompactionIterable.class);
+
+    protected final CompactionType type;
+    protected final CompactionController controller;
+    protected long totalBytes;
+    protected volatile long bytesRead;
+
+    protected final Throttle throttle;
+
+    public AbstractCompactionIterable(CompactionController controller, CompactionType type)
+    {
+        this.controller = controller;
+        this.type = type;
+        this.throttle = new Throttle(toString(), new Throttle.ThroughputFunction()
+        {
+            /** @return Instantaneous throughput target in bytes per millisecond. */
+            public int targetThroughput()
+            {
+                if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1 || StorageService.instance.isBootstrapMode())
+                    // throttling disabled
+                    return 0;
+                // total throughput
+                int totalBytesPerMS = DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024 / 1000;
+                // per stream throughput (target bytes per MS)
+                return totalBytesPerMS / Math.max(1, CompactionManager.instance.getActiveCompactions());
+            }
+        });
+    }
+
+    protected static List<SSTableScanner> getScanners(Iterable<SSTableReader> sstables) throws IOException
+    {
+        ArrayList<SSTableScanner> scanners = new ArrayList<SSTableScanner>();
+        for (SSTableReader sstable : sstables)
+            scanners.add(sstable.getDirectScanner(FILE_BUFFER_SIZE));
+        return scanners;
+    }
+
+    public CompactionInfo getCompactionInfo()
+    {
+        return new CompactionInfo(this.hashCode(),
+                                  controller.getKeyspace(),
+                                  controller.getColumnFamily(),
+                                  type,
+                                  bytesRead,
+                                  totalBytes);
+    }
+
+    public abstract CloseableIterator<AbstractCompactedRow> iterator();
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java Wed Sep  7 16:31:52 2011
@@ -29,31 +29,27 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableScanner;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.MergeIterator;
-import org.apache.cassandra.utils.Throttle;
 
-public class CompactionIterable implements Iterable<AbstractCompactedRow>, CompactionInfo.Holder
+public class CompactionIterable extends AbstractCompactionIterable
 {
     private static Logger logger = LoggerFactory.getLogger(CompactionIterable.class);
 
-    public static final int FILE_BUFFER_SIZE = 1024 * 1024;
-
-    private MergeIterator<IColumnIterator, AbstractCompactedRow> source;
-    protected final CompactionType type;
+    private long row;
     private final List<SSTableScanner> scanners;
-    protected final CompactionController controller;
-    private final Throttle throttle;
 
-    private long totalBytes;
-    private long bytesRead;
-    private long row;
+    private static final Comparator<IColumnIterator> comparator = new Comparator<IColumnIterator>()
+    {
+        public int compare(IColumnIterator i1, IColumnIterator i2)
+        {
+            return i1.getKey().compareTo(i2.getKey());
+        }
+    };
 
     public CompactionIterable(CompactionType type, Iterable<SSTableReader> sstables, CompactionController controller) throws IOException
     {
@@ -62,27 +58,12 @@ public class CompactionIterable implemen
 
     protected CompactionIterable(CompactionType type, List<SSTableScanner> scanners, CompactionController controller)
     {
-        this.type = type;
+        super(controller, type);
         this.scanners = scanners;
-        this.controller = controller;
         row = 0;
         totalBytes = bytesRead = 0;
         for (SSTableScanner scanner : scanners)
             totalBytes += scanner.getFileLength();
-        this.throttle = new Throttle(toString(), new Throttle.ThroughputFunction()
-        {
-            /** @return Instantaneous throughput target in bytes per millisecond. */
-            public int targetThroughput()
-            {
-                if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1 || StorageService.instance.isBootstrapMode())
-                    // throttling disabled
-                    return 0;
-                // total throughput
-                int totalBytesPerMS = DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024 / 1000;
-                // per stream throughput (target bytes per MS)
-                return totalBytesPerMS / Math.max(1, CompactionManager.instance.getActiveCompactions());
-            }
-        });
     }
 
     protected static List<SSTableScanner> getScanners(Iterable<SSTableReader> sstables) throws IOException
@@ -93,33 +74,23 @@ public class CompactionIterable implemen
         return scanners;
     }
 
-    public CompactionInfo getCompactionInfo()
-    {
-        return new CompactionInfo(this.hashCode(),
-                                  controller.getKeyspace(),
-                                  controller.getColumnFamily(),
-                                  type,
-                                  bytesRead,
-                                  totalBytes);
-    }
-
     public CloseableIterator<AbstractCompactedRow> iterator()
     {
-        return MergeIterator.get(scanners, ICOMP, new Reducer());
+        return MergeIterator.get(scanners, comparator, new Reducer());
     }
 
     public String toString()
     {
         return this.getCompactionInfo().toString();
     }
-    
+
     protected class Reducer extends MergeIterator.Reducer<IColumnIterator, AbstractCompactedRow>
     {
         protected final List<SSTableIdentityIterator> rows = new ArrayList<SSTableIdentityIterator>();
 
         public void reduce(IColumnIterator current)
         {
-            rows.add((SSTableIdentityIterator)current);
+            rows.add((SSTableIdentityIterator) current);
         }
 
         protected AbstractCompactedRow getReduced()
@@ -134,11 +105,13 @@ public class CompactionIterable implemen
                     controller.invalidateCachedRow(compactedRow.key);
                     return null;
                 }
-
-                // If the raw is cached, we call removeDeleted on it to have/ coherent query returns. However it would look
-                // like some deleted columns lived longer than gc_grace + compaction. This can also free up big amount of
-                // memory on long running instances
-                controller.removeDeletedInCache(compactedRow.key);
+                else
+                {
+                    // If the raw is cached, we call removeDeleted on it to have/ coherent query returns. However it would look
+                    // like some deleted columns lived longer than gc_grace + compaction. This can also free up big amount of
+                    // memory on long running instances
+                    controller.removeDeletedInCache(compactedRow.key);
+                }
 
                 return compactedRow;
             }
@@ -147,22 +120,13 @@ public class CompactionIterable implemen
                 rows.clear();
                 if ((row++ % 1000) == 0)
                 {
-                    bytesRead = 0;
+                    long n = 0;
                     for (SSTableScanner scanner : scanners)
-                    {
-                        bytesRead += scanner.getFilePointer();
-                    }
+                        n += scanner.getFilePointer();
+                    bytesRead = n;
                     throttle.throttle(bytesRead);
                 }
             }
         }
     }
-
-    public final static Comparator<IColumnIterator> ICOMP = new Comparator<IColumnIterator>()
-    {
-        public int compare(IColumnIterator i1, IColumnIterator i2)
-        {
-            return i1.getKey().compareTo(i2.getKey());
-        }
-    };
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Wed Sep  7 16:31:52 2011
@@ -699,7 +699,7 @@ public class CompactionManager implement
             if (compactionFileLocation == null)
                 throw new IOException("disk full");
 
-            SSTableScanner scanner = sstable.getDirectScanner(CompactionIterable.FILE_BUFFER_SIZE);
+            SSTableScanner scanner = sstable.getDirectScanner(AbstractCompactionIterable.FILE_BUFFER_SIZE);
             Collection<ByteBuffer> indexedColumns = cfs.indexManager.getIndexedColumns();
             CleanupInfo ci = new CleanupInfo(sstable, scanner);
             executor.beginCompaction(ci);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java Wed Sep  7 16:31:52 2011
@@ -128,7 +128,9 @@ public class CompactionTask extends Abst
         if (logger.isDebugEnabled())
             logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
 
-        CompactionIterable ci = new CompactionIterable(type, toCompact, controller); // retain a handle so we can call close()
+        AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
+                                      ? new ParallelCompactionIterable(type, toCompact, controller)
+                                      : new CompactionIterable(type, toCompact, controller);
         CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
         Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull());
         Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java Wed Sep  7 16:31:52 2011
@@ -24,6 +24,7 @@ package org.apache.cassandra.db.compacti
 import java.io.DataOutput;
 import java.io.IOError;
 import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
 import java.security.MessageDigest;
 import java.util.Iterator;
 import java.util.List;
@@ -35,8 +36,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.IIterableColumns;
 import org.apache.cassandra.utils.MergeIterator;
@@ -56,7 +57,7 @@ public class LazilyCompactedRow extends 
 {
     private static Logger logger = LoggerFactory.getLogger(LazilyCompactedRow.class);
 
-    private final List<SSTableIdentityIterator> rows;
+    private final List<? extends ICountableColumnIterator> rows;
     private final CompactionController controller;
     private final boolean shouldPurge;
     private final DataOutputBuffer headerBuffer;
@@ -65,8 +66,9 @@ public class LazilyCompactedRow extends 
     private int columnCount;
     private long maxTimestamp;
     private long columnSerializedSize;
+    private boolean closed;
 
-    public LazilyCompactedRow(CompactionController controller, List<SSTableIdentityIterator> rows)
+    public LazilyCompactedRow(CompactionController controller, List<? extends ICountableColumnIterator> rows)
     {
         super(rows.get(0).getKey());
         this.rows = rows;
@@ -96,6 +98,8 @@ public class LazilyCompactedRow extends 
 
     public long write(DataOutput out) throws IOException
     {
+        assert !closed;
+
         DataOutputBuffer clockOut = new DataOutputBuffer();
         ColumnFamily.serializer().serializeCFInfo(emptyColumnFamily, clockOut);
 
@@ -119,11 +123,14 @@ public class LazilyCompactedRow extends 
         assert secondPassColumnSize == columnSerializedSize
                : "originally calculated column size of " + columnSerializedSize + " but now it is " + secondPassColumnSize;
 
+        close();
         return dataSize;
     }
 
     public void update(MessageDigest digest)
     {
+        assert !closed;
+
         // no special-case for rows.size == 1, we're actually skipping some bytes here so just
         // blindly updating everything wouldn't be correct
         DataOutputBuffer out = new DataOutputBuffer();
@@ -144,6 +151,7 @@ public class LazilyCompactedRow extends 
         {
             iter.next().updateDigest(digest);
         }
+        close();
     }
 
     public boolean isEmpty()
@@ -155,8 +163,8 @@ public class LazilyCompactedRow extends 
     public int getEstimatedColumnCount()
     {
         int n = 0;
-        for (SSTableIdentityIterator row : rows)
-            n += row.columnCount;
+        for (ICountableColumnIterator row : rows)
+            n += row.getColumnCount();
         return n;
     }
 
@@ -167,7 +175,7 @@ public class LazilyCompactedRow extends 
 
     public Iterator<IColumn> iterator()
     {
-        for (SSTableIdentityIterator row : rows)
+        for (ICountableColumnIterator row : rows)
             row.reset();
         reducer = new Reducer();
         Iterator<IColumn> iter = MergeIterator.get(rows, getComparator().columnComparator, reducer);
@@ -184,6 +192,22 @@ public class LazilyCompactedRow extends 
         return maxTimestamp;
     }
 
+    private void close()
+    {
+        for (IColumnIterator row : rows)
+        {
+            try
+            {
+                row.close();
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+        }
+        closed = true;
+    }
+
     private class Reducer extends MergeIterator.Reducer<IColumn, IColumn>
     {
         ColumnFamily container = emptyColumnFamily.cloneMeShallow();

Added: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java?rev=1166255&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java Wed Sep  7 16:31:52 2011
@@ -0,0 +1,456 @@
+package org.apache.cassandra.db.compaction;
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.Condition;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableScanner;
+import org.apache.cassandra.utils.*;
+
+/**
+ * A class to run compaction taking advantage of multiple-core processes:
+ *
+ * One Deserializer thread per input sstable performs read + deserialize (a row at a time).
+ * The resulting ColumnFamilies are added to a queue, which is fed to the merge Reducer.
+ *
+ * The merge Reducer creates MergeTasks on a thread-per-core Executor, and returns AsyncPrecompactedRow objects.
+ *
+ * The main complication is in handling larger-than-memory rows.  When one is encountered, no further deserialization
+ * is done until that row is merged and written -- creating a pipeline stall, as it were.  Thus, this is intended
+ * to be useful with mostly-in-memory row sizes, but preserves correctness in the face of occasional exceptions.
+ */
+public class ParallelCompactionIterable extends AbstractCompactionIterable
+{
+    private static Logger logger = LoggerFactory.getLogger(ParallelCompactionIterable.class);
+
+    private final List<SSTableScanner> scanners;
+    private final int maxInMemorySize;
+
+    public ParallelCompactionIterable(CompactionType type, Iterable<SSTableReader> sstables, CompactionController controller) throws IOException
+    {
+        this(type, getScanners(sstables), controller, DatabaseDescriptor.getInMemoryCompactionLimit() / Iterables.size(sstables));
+    }
+
+    public ParallelCompactionIterable(CompactionType type, Iterable<SSTableReader> sstables, CompactionController controller, int maxInMemorySize) throws IOException
+    {
+        this(type, getScanners(sstables), controller, maxInMemorySize);
+    }
+
+    protected ParallelCompactionIterable(CompactionType type, List<SSTableScanner> scanners, CompactionController controller, int maxInMemorySize)
+    {
+        super(controller, type);
+        this.scanners = scanners;
+        this.maxInMemorySize = maxInMemorySize;
+    }
+
+    public CloseableIterator<AbstractCompactedRow> iterator()
+    {
+        List<CloseableIterator<RowContainer>> sources = new ArrayList<CloseableIterator<RowContainer>>();
+        for (SSTableScanner scanner : scanners)
+            sources.add(new Deserializer(scanner, maxInMemorySize));
+        return new Unwrapper(MergeIterator.get(sources, RowContainer.comparator, new Reducer()), controller);
+    }
+
+    private static class Unwrapper extends AbstractIterator<AbstractCompactedRow> implements CloseableIterator<AbstractCompactedRow>
+    {
+        private final MergeIterator<RowContainer, CompactedRowContainer> reducer;
+        private final CompactionController controller;
+
+        public Unwrapper(MergeIterator<RowContainer, CompactedRowContainer> reducer, CompactionController controller)
+        {
+            this.reducer = reducer;
+            this.controller = controller;
+        }
+
+        protected AbstractCompactedRow computeNext()
+        {
+            if (!reducer.hasNext())
+                return endOfData();
+
+            CompactedRowContainer container = reducer.next();
+            AbstractCompactedRow compactedRow;
+            try
+            {
+                compactedRow = container.future == null
+                             ? container.row
+                             : new PrecompactedRow(container.key, controller, container.future.get());
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
+            catch (ExecutionException e)
+            {
+                throw new RuntimeException(e);
+            }
+
+            if (compactedRow.isEmpty())
+            {
+                controller.invalidateCachedRow(compactedRow.key);
+                return null;
+            }
+            else
+            {
+                // If the raw is cached, we call removeDeleted on it to have/ coherent query returns. However it would look
+                // like some deleted columns lived longer than gc_grace + compaction. This can also free up big amount of
+                // memory on long running instances
+                controller.removeDeletedInCache(compactedRow.key);
+                return compactedRow;
+            }
+        }
+
+        public void close() throws IOException
+        {
+            reducer.close();
+        }
+    }
+
+    private class Reducer extends MergeIterator.Reducer<RowContainer, CompactedRowContainer>
+    {
+        private final List<RowContainer> rows = new ArrayList<RowContainer>();
+
+        private final ThreadPoolExecutor executor;
+        private int row = 0;
+
+        private Reducer()
+        {
+            super();
+            executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
+                                              Runtime.getRuntime().availableProcessors(),
+                                              Integer.MAX_VALUE,
+                                              TimeUnit.MILLISECONDS,
+                                              new SynchronousQueue<Runnable>(),
+                                              new NamedThreadFactory("CompactionReducer"));
+            executor.setRejectedExecutionHandler(DebuggableThreadPoolExecutor.blockingExecutionHandler);
+        }
+
+        public void reduce(RowContainer current)
+        {
+            rows.add(current);
+        }
+
+        protected CompactedRowContainer getReduced()
+        {
+            assert rows.size() > 0;
+
+            CompactedRowContainer compacted = getCompactedRow(rows);
+            rows.clear();
+            if ((row++ % 1000) == 0)
+            {
+                long n = 0;
+                for (SSTableScanner scanner : scanners)
+                    n += scanner.getFilePointer();
+                bytesRead = n;
+                throttle.throttle(bytesRead);
+            }
+            return compacted;
+        }
+
+        public CompactedRowContainer getCompactedRow(List<RowContainer> rows)
+        {
+            boolean inMemory = true;
+            for (RowContainer container : rows)
+            {
+                if (container.row == null)
+                {
+                    inMemory = false;
+                    break;
+                }
+            }
+
+            if (inMemory)
+                return new CompactedRowContainer(rows.get(0).getKey(), executor.submit(new MergeTask(new ArrayList<RowContainer>(rows))));
+
+            List<ICountableColumnIterator> iterators = new ArrayList<ICountableColumnIterator>();
+            for (RowContainer container : rows)
+                iterators.add(container.row == null ? container.wrapper : new DeserializedColumnIterator(container.row));
+            return new CompactedRowContainer(new LazilyCompactedRow(controller, iterators));
+        }
+
+        private class MergeTask implements Callable<ColumnFamily>
+        {
+            private final List<RowContainer> rows;
+
+            public MergeTask(List<RowContainer> rows)
+            {
+                this.rows = rows;
+            }
+
+            public ColumnFamily call() throws Exception
+            {
+                ColumnFamily cf = null;
+                for (RowContainer container : rows)
+                {
+                    ColumnFamily thisCF = container.row.cf;
+                    if (cf == null)
+                    {
+                        cf = thisCF;
+                    }
+                    else
+                    {
+                        cf.addAll(thisCF, HeapAllocator.instance);
+                    }
+                }
+
+                return PrecompactedRow.removeDeletedAndOldShards(rows.get(0).getKey(), controller, cf);
+            }
+        }
+
+        private class DeserializedColumnIterator implements ICountableColumnIterator
+        {
+            private final Row row;
+            private Iterator<IColumn> iter;
+
+            public DeserializedColumnIterator(Row row)
+            {
+                this.row = row;
+                iter = row.cf.iterator();
+            }
+
+            public ColumnFamily getColumnFamily()
+            {
+                return row.cf;
+            }
+
+            public DecoratedKey getKey()
+            {
+                return row.key;
+            }
+
+            public int getColumnCount()
+            {
+                return row.cf.getColumnCount();
+            }
+
+            public void reset()
+            {
+                iter = row.cf.iterator();
+            }
+
+            public void close() throws IOException {}
+
+            public boolean hasNext()
+            {
+                return iter.hasNext();
+            }
+
+            public IColumn next()
+            {
+                return iter.next();
+            }
+
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+        }
+    }
+
+    private static class Deserializer extends AbstractIterator<RowContainer> implements CloseableIterator<RowContainer>
+    {
+        private final LinkedBlockingQueue<RowContainer> queue = new LinkedBlockingQueue<RowContainer>(1);
+        private static final RowContainer finished = new RowContainer((Row) null);
+        private Condition condition;
+        private final SSTableScanner scanner;
+
+        public Deserializer(SSTableScanner ssts, final int maxInMemorySize)
+        {
+            this.scanner = ssts;
+            Runnable runnable = new WrappedRunnable()
+            {
+                protected void runMayThrow() throws Exception
+                {
+                    while (true)
+                    {
+                        if (condition != null)
+                            condition.await();
+
+                        if (!scanner.hasNext())
+                        {
+                            queue.put(finished);
+                            break;
+                        }
+
+                        SSTableIdentityIterator iter = (SSTableIdentityIterator) scanner.next();
+                        if (iter.dataSize > maxInMemorySize)
+                        {
+                            logger.debug("parallel lazy deserialize from " + iter.getPath());
+                            condition = new SimpleCondition();
+                            queue.put(new RowContainer(new NotifyingSSTableIdentityIterator(iter, condition)));
+                        }
+                        else
+                        {
+                            logger.debug("parallel eager deserialize from " + iter.getPath());
+                            queue.put(new RowContainer(new Row(iter.getKey(), iter.getColumnFamilyWithColumns())));
+                        }
+                    }
+                }
+            };
+            new Thread(runnable, "Deserialize " + scanner.sstable).start();
+        }
+
+        protected RowContainer computeNext()
+        {
+            RowContainer container;
+            try
+            {
+                container = queue.take();
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
+            return container == finished ? endOfData() : container;
+        }
+
+        public void close() throws IOException
+        {
+            scanner.close();
+        }
+    }
+
+    /**
+     * a wrapper around SSTII that notifies the given condition when it is closed
+     */
+    private static class NotifyingSSTableIdentityIterator implements ICountableColumnIterator
+    {
+        private final SSTableIdentityIterator wrapped;
+        private final Condition condition;
+
+        public NotifyingSSTableIdentityIterator(SSTableIdentityIterator wrapped, Condition condition)
+        {
+            this.wrapped = wrapped;
+            this.condition = condition;
+        }
+
+        public ColumnFamily getColumnFamily()
+        {
+            return wrapped.getColumnFamily();
+        }
+
+        public DecoratedKey getKey()
+        {
+            return wrapped.getKey();
+        }
+
+        public int getColumnCount()
+        {
+            return wrapped.getColumnCount();
+        }
+
+        public void reset()
+        {
+            wrapped.reset();
+        }
+
+        public void close() throws IOException
+        {
+            wrapped.close();
+            condition.signal();
+        }
+
+        public boolean hasNext()
+        {
+            return wrapped.hasNext();
+        }
+
+        public IColumn next()
+        {
+            return wrapped.next();
+        }
+
+        public void remove()
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private static class RowContainer
+    {
+        // either row is not null, or wrapper is not null.  But not both.
+        public final Row row;
+        public final NotifyingSSTableIdentityIterator wrapper;
+        public static final Comparator<RowContainer> comparator = new Comparator<RowContainer>()
+        {
+            public int compare(RowContainer o1, RowContainer o2)
+            {
+                return o1.getKey().compareTo(o2.getKey());
+            }
+        };
+
+        private RowContainer(Row row)
+        {
+            this.row = row;
+            wrapper = null;
+        }
+
+        public RowContainer(NotifyingSSTableIdentityIterator wrapper)
+        {
+            this.wrapper = wrapper;
+            row = null;
+        }
+
+        public DecoratedKey getKey()
+        {
+            return row == null ? wrapper.getKey() : row.key;
+        }
+    }
+
+    private static class CompactedRowContainer
+    {
+        public final DecoratedKey key;
+        /** either "future" or "row" will be not-null, but not both at once. */
+        public final Future<ColumnFamily> future;
+        public final LazilyCompactedRow row;
+
+        private CompactedRowContainer(DecoratedKey key, Future<ColumnFamily> future)
+        {
+            this.key = key;
+            this.future = future;
+            row = null;
+        }
+
+        private CompactedRowContainer(LazilyCompactedRow row)
+        {
+            this.row = row;
+            future = null;
+            key = null;
+        }
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java Wed Sep  7 16:31:52 2011
@@ -56,6 +56,14 @@ public class PrecompactedRow extends Abs
         this.gcBefore = Integer.MAX_VALUE;
     }
 
+    /** it is caller's responsibility to call removeDeleted + removeOldShards from the cf before calling this constructor */
+    public PrecompactedRow(DecoratedKey key, CompactionController controller, ColumnFamily cf)
+    {
+        super(key);
+        this.gcBefore = controller.gcBefore;
+        compactedCf = cf;
+    }
+
     public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, CompactionController controller, ColumnFamily cf)
     {
         return removeDeletedAndOldShards(controller.shouldPurge(key), controller, cf);
@@ -71,9 +79,9 @@ public class PrecompactedRow extends Abs
 
     public PrecompactedRow(CompactionController controller, List<SSTableIdentityIterator> rows)
     {
-        super(rows.get(0).getKey());
-        gcBefore = controller.gcBefore;
-        compactedCf = removeDeletedAndOldShards(rows.get(0).getKey(), controller, merge(rows));
+        this(rows.get(0).getKey(),
+             controller,
+             removeDeletedAndOldShards(rows.get(0).getKey(), controller, merge(rows)));
     }
 
     private static ColumnFamily merge(List<SSTableIdentityIterator> rows)

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Wed Sep  7 16:31:52 2011
@@ -31,11 +31,12 @@ import org.apache.cassandra.db.ColumnFam
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.BytesReadTracker;
 
-public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, IColumnIterator
+public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, ICountableColumnIterator
 {
     private static final Logger logger = LoggerFactory.getLogger(SSTableIdentityIterator.class);
 
@@ -46,7 +47,7 @@ public class SSTableIdentityIterator imp
     public final boolean fromRemote;
 
     private final ColumnFamily columnFamily;
-    public final int columnCount;
+    private final int columnCount;
     private long columnPosition;
 
     private BytesReadTracker inputWithTracker; // tracks bytes read
@@ -271,4 +272,9 @@ public class SSTableIdentityIterator imp
         }
         inputWithTracker.reset(headerSize());
     }
+
+    public int getColumnCount()
+    {
+        return columnCount;
+    }
 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java Wed Sep  7 16:31:52 2011
@@ -21,25 +21,23 @@ package org.apache.cassandra.io;
  */
 
 
-import static junit.framework.Assert.assertEquals;
-
-import java.io.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 
+import org.junit.Test;
+
 import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.*;
-import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.io.sstable.IndexHelper;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
@@ -48,30 +46,54 @@ import org.apache.cassandra.io.util.Data
 import org.apache.cassandra.io.util.MappedFileDataInput;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
-import org.junit.Test;
+import static junit.framework.Assert.assertEquals;
 
 
 public class LazilyCompactedRowTest extends CleanupHelper
 {
-    private void assertBytes(ColumnFamilyStore cfs, int gcBefore) throws IOException
+    private static void assertBytes(ColumnFamilyStore cfs, int gcBefore) throws IOException
     {
         Collection<SSTableReader> sstables = cfs.getSSTables();
-        Iterator<AbstractCompactedRow> ci1 = new CompactionIterable(CompactionType.UNKNOWN, sstables, new PreCompactingController(cfs, sstables, gcBefore, false)).iterator();
-        Iterator<AbstractCompactedRow> ci2 = new CompactionIterable(CompactionType.UNKNOWN, sstables, new LazilyCompactingController(cfs, sstables, gcBefore, false)).iterator();
+
+        // compare eager and lazy compactions
+        AbstractCompactionIterable eager = new CompactionIterable(CompactionType.UNKNOWN,
+                                                                  sstables,
+                                                                  new PreCompactingController(cfs, sstables, gcBefore, false));
+        AbstractCompactionIterable lazy = new CompactionIterable(CompactionType.UNKNOWN,
+                                                                 sstables,
+                                                                 new LazilyCompactingController(cfs, sstables, gcBefore, false));
+        assertBytes(sstables, eager, lazy);
+
+        // compare eager and parallel-lazy compactions
+        eager = new CompactionIterable(CompactionType.UNKNOWN,
+                                       sstables,
+                                       new PreCompactingController(cfs, sstables, gcBefore, false));
+        AbstractCompactionIterable parallel = new ParallelCompactionIterable(CompactionType.UNKNOWN,
+                                                                             sstables,
+                                                                             new CompactionController(cfs, sstables, gcBefore, false),
+                                                                             0);
+        assertBytes(sstables, eager, parallel);
+    }
+
+    private static void assertBytes(Collection<SSTableReader> sstables, AbstractCompactionIterable ci1, AbstractCompactionIterable ci2) throws IOException
+    {
+        CloseableIterator<AbstractCompactedRow> iter1 = ci1.iterator();
+        CloseableIterator<AbstractCompactedRow> iter2 = ci2.iterator();
 
         while (true)
         {
-            if (!ci1.hasNext())
+            if (!iter1.hasNext())
             {
-                assert !ci2.hasNext();
+                assert !iter2.hasNext();
                 break;
             }
 
-            AbstractCompactedRow row1 = ci1.next();
-            AbstractCompactedRow row2 = ci2.next();
+            AbstractCompactedRow row1 = iter1.next();
+            AbstractCompactedRow row2 = iter2.next();
             DataOutputBuffer out1 = new DataOutputBuffer();
             DataOutputBuffer out2 = new DataOutputBuffer();
             row1.write(out1);
@@ -94,8 +116,8 @@ public class LazilyCompactedRowTest exte
             // row size can differ b/c of bloom filter counts being different
             long rowSize1 = SSTableReader.readRowSize(in1, sstables.iterator().next().descriptor);
             long rowSize2 = SSTableReader.readRowSize(in2, sstables.iterator().next().descriptor);
-            assertEquals(out1.getLength(), rowSize1 + 8);
-            assertEquals(out2.getLength(), rowSize2 + 8);
+            assertEquals(rowSize1 + 8, out1.getLength());
+            assertEquals(rowSize2 + 8, out2.getLength());
             // bloom filter
             IndexHelper.defreezeBloomFilter(in1, rowSize1, false);
             IndexHelper.defreezeBloomFilter(in2, rowSize2, false);
@@ -115,7 +137,7 @@ public class LazilyCompactedRowTest exte
             ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf1, in1);
             ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf2, in2);
             assert cf1.getLocalDeletionTime() == cf2.getLocalDeletionTime();
-            assert cf1.getMarkedForDeleteAt() == cf2.getMarkedForDeleteAt();   
+            assert cf1.getMarkedForDeleteAt() == cf2.getMarkedForDeleteAt();
             // columns
             int columns = in1.readInt();
             assert columns == in2.readInt();
@@ -130,23 +152,25 @@ public class LazilyCompactedRowTest exte
             assert in2.available() == 0;
         }
     }
-    
+
     private void assertDigest(ColumnFamilyStore cfs, int gcBefore) throws IOException, NoSuchAlgorithmException
     {
         Collection<SSTableReader> sstables = cfs.getSSTables();
-        Iterator<AbstractCompactedRow> ci1 = new CompactionIterable(CompactionType.UNKNOWN, sstables, new PreCompactingController(cfs, sstables, gcBefore, false)).iterator();
-        Iterator<AbstractCompactedRow> ci2 = new CompactionIterable(CompactionType.UNKNOWN, sstables, new LazilyCompactingController(cfs, sstables, gcBefore, false)).iterator();
+        AbstractCompactionIterable ci1 = new CompactionIterable(CompactionType.UNKNOWN, sstables, new PreCompactingController(cfs, sstables, gcBefore, false));
+        AbstractCompactionIterable ci2 = new CompactionIterable(CompactionType.UNKNOWN, sstables, new LazilyCompactingController(cfs, sstables, gcBefore, false));
+        CloseableIterator<AbstractCompactedRow> iter1 = ci1.iterator();
+        CloseableIterator<AbstractCompactedRow> iter2 = ci2.iterator();
 
         while (true)
         {
-            if (!ci1.hasNext())
+            if (!iter1.hasNext())
             {
-                assert !ci2.hasNext();
+                assert !iter2.hasNext();
                 break;
             }
 
-            AbstractCompactedRow row1 = ci1.next();
-            AbstractCompactedRow row2 = ci2.next();
+            AbstractCompactedRow row1 = iter1.next();
+            AbstractCompactedRow row2 = iter2.next();
             MessageDigest digest1 = MessageDigest.getInstance("MD5");
             MessageDigest digest2 = MessageDigest.getInstance("MD5");