You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/10/30 16:18:36 UTC

[08/10] git commit: Remove multithreaded compaction and PCR patch by jbellis; reviewed by marcuse for CASSANDRA-6142

Remove multithreaded compaction and PCR
patch by jbellis; reviewed by marcuse for CASSANDRA-6142


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/36cdf34b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/36cdf34b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/36cdf34b

Branch: refs/heads/trunk
Commit: 36cdf34bd92ede5ad99447e10d90e6caa1fd743a
Parents: ba3c1bc
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 30 10:17:23 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Oct 30 10:17:23 2013 -0500

----------------------------------------------------------------------
 conf/cassandra.yaml                             |   7 -
 .../org/apache/cassandra/config/Config.java     |   1 -
 .../cassandra/config/DatabaseDescriptor.java    |   7 -
 .../org/apache/cassandra/db/ColumnIndex.java    |  19 +-
 .../db/compaction/AbstractCompactedRow.java     |   4 +-
 .../db/compaction/CompactionController.java     |  58 +--
 .../db/compaction/CompactionIterable.java       |   9 +-
 .../db/compaction/CompactionManager.java        |   6 +-
 .../cassandra/db/compaction/CompactionTask.java |   4 +-
 .../db/compaction/LazilyCompactedRow.java       |  91 +++--
 .../compaction/ParallelCompactionIterable.java  | 403 -------------------
 .../db/compaction/PrecompactedRow.java          | 202 ----------
 .../db/compaction/SSTableSplitter.java          |   4 +-
 .../cassandra/db/compaction/Scrubber.java       |  56 ++-
 .../cassandra/db/compaction/Upgrader.java       |   4 +-
 .../io/sstable/IndexSummaryBuilder.java         |   2 +-
 .../cassandra/io/sstable/SSTableWriter.java     |   2 +-
 test/data/serialization/2.0/db.RowMutation.bin  | Bin 3599 -> 3599 bytes
 .../db/compaction/CompactionsPurgeTest.java     |  32 +-
 .../cassandra/io/LazilyCompactedRowTest.java    | 319 ---------------
 .../apache/cassandra/repair/ValidatorTest.java  |  39 +-
 21 files changed, 173 insertions(+), 1096 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 455421a..d6b5b7a 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -443,13 +443,6 @@ in_memory_compaction_limit_in_mb: 64
 # Uncomment to make compaction mono-threaded, the pre-0.8 default.
 #concurrent_compactors: 1
 
-# Multi-threaded compaction. When enabled, each compaction will use
-# up to one thread per core, plus one thread per sstable being merged.
-# This is usually only useful for SSD-based hardware: otherwise, 
-# your concern is usually to get compaction to do LESS i/o (see:
-# compaction_throughput_mb_per_sec), not more.
-multithreaded_compaction: false
-
 # Throttles compaction to the given total throughput across the entire
 # system. The faster you insert data, the faster you need to compact in
 # order to keep the sstable count down, but in general, setting this to

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 8f0f22e..e9f48e6 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -109,7 +109,6 @@ public class Config
     public Integer in_memory_compaction_limit_in_mb = 64;
     public Integer concurrent_compactors = FBUtilities.getAvailableProcessors();
     public volatile Integer compaction_throughput_mb_per_sec = 16;
-    public Boolean multithreaded_compaction = false;
 
     public Integer max_streaming_retries = 3;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 90f1753..1997347 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -382,8 +382,6 @@ public class DatabaseDescriptor
             logger.debug("setting auto_bootstrap to {}", conf.auto_bootstrap);
         }
 
-        logger.info("{}using multi-threaded compaction", (conf.multithreaded_compaction ? "" : "Not "));
-
         if (conf.in_memory_compaction_limit_in_mb != null && conf.in_memory_compaction_limit_in_mb <= 0)
         {
             throw new ConfigurationException("in_memory_compaction_limit_in_mb must be a positive integer");
@@ -856,11 +854,6 @@ public class DatabaseDescriptor
         return conf.concurrent_compactors;
     }
 
-    public static boolean isMultithreadedCompaction()
-    {
-        return conf.multithreaded_compaction;
-    }
-
     public static int getCompactionThroughputMbPerSec()
     {
         return conf.compaction_throughput_mb_per_sec;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index a5e0447..6dd2028 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -140,19 +140,26 @@ public class ColumnIndex
             }
             ColumnIndex index = build();
 
-            finish();
+            maybeWriteEmptyRowHeader();
 
             return index;
         }
 
-        public ColumnIndex build(Iterable<OnDiskAtom> columns) throws IOException
+        /**
+         * The important distinction wrt build() is that we may be building for a row that ends up
+         * being compacted away entirely, i.e., the input consists only of expired tombstones (or
+         * columns shadowed by expired tombstone).  Thus, it is the caller's responsibility
+         * to decide whether to write the header for an empty row.
+         */
+        public ColumnIndex buildForCompaction(Iterator<OnDiskAtom> columns) throws IOException
         {
-            for (OnDiskAtom c : columns)
+            while (columns.hasNext())
+            {
+                OnDiskAtom c =  columns.next();
                 add(c);
+            }
             ColumnIndex index = build();
 
-            finish();
-
             return index;
         }
 
@@ -219,7 +226,7 @@ public class ColumnIndex
             return result;
         }
 
-        public void finish() throws IOException
+        public void maybeWriteEmptyRowHeader() throws IOException
         {
             if (!deletionInfo.isLive())
                 maybeWriteRowHeader();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
index 966770f..734155e 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
@@ -42,10 +42,10 @@ public abstract class AbstractCompactedRow implements Closeable
 
     /**
      * write the row (size + column index + filter + column data, but NOT row key) to @param out.
-     * It is an error to call this if isEmpty is false.  (Because the key is appended first,
-     * so we'd have an incomplete row written.)
      *
      * write() may change internal state; it is NOT valid to call write() or update() a second time.
+     *
+     * @return index information for the written row, or null if the compaction resulted in only expired tombstones.
      */
     public abstract RowIndexEntry write(long currentPosition, DataOutput out) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 9552895..d7c2e4d 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -27,7 +27,6 @@ import java.util.Set;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DataTracker;
 import org.apache.cassandra.db.DecoratedKey;
@@ -51,9 +50,6 @@ public class CompactionController
     public final int gcBefore;
     public final int mergeShardBefore;
 
-    /**
-     * Constructor that subclasses may use when overriding shouldPurge to not need overlappingTree
-     */
     protected CompactionController(ColumnFamilyStore cfs, int maxValue)
     {
         this(cfs, null, maxValue);
@@ -153,25 +149,24 @@ public class CompactionController
     }
 
     /**
-     * @return true if it's okay to drop tombstones for the given row, i.e., if we know all the verisons of the row
-     * are included in the compaction set
+     * @return the largest timestamp before which it's okay to drop tombstones for the given partition;
+     * i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be supressed
+     * in other sstables.
      */
-    public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp)
+    public long maxPurgeableTimestamp(DecoratedKey key)
     {
         List<SSTableReader> filteredSSTables = overlappingTree.search(key);
+        long min = Long.MAX_VALUE;
         for (SSTableReader sstable : filteredSSTables)
         {
-            if (sstable.getMinTimestamp() <= maxDeletionTimestamp)
-            {
-                // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing),
-                // we check index file instead.
-                if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
-                    return false;
-                else if (sstable.getBloomFilter().isPresent(key.key))
-                    return false;
-            }
+            // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing),
+            // we check index file instead.
+            if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
+                min = Math.min(min, sstable.getMinTimestamp());
+            else if (sstable.getBloomFilter().isPresent(key.key))
+                min = Math.min(min, sstable.getMinTimestamp());
         }
-        return true;
+        return min;
     }
 
     public void invalidateCachedRow(DecoratedKey key)
@@ -179,35 +174,6 @@ public class CompactionController
         cfs.invalidateCachedRow(key);
     }
 
-    /**
-     * @return an AbstractCompactedRow implementation to write the merged rows in question.
-     *
-     * If there is a single source row, the data is from a current-version sstable, we don't
-     * need to purge and we aren't forcing deserialization for scrub, write it unchanged.
-     * Otherwise, we deserialize, purge tombstones, and reserialize in the latest version.
-     */
-    public AbstractCompactedRow getCompactedRow(List<SSTableIdentityIterator> rows)
-    {
-        long rowSize = 0;
-        for (SSTableIdentityIterator row : rows)
-            rowSize += row.dataSize;
-
-        if (rowSize > DatabaseDescriptor.getInMemoryCompactionLimit())
-        {
-            String keyString = cfs.metadata.getKeyValidator().getString(rows.get(0).getKey().key);
-            logger.info(String.format("Compacting large row %s/%s:%s (%d bytes) incrementally",
-                                      cfs.keyspace.getName(), cfs.name, keyString, rowSize));
-            return new LazilyCompactedRow(this, rows);
-        }
-        return new PrecompactedRow(this, rows);
-    }
-
-    /** convenience method for single-sstable compactions */
-    public AbstractCompactedRow getCompactedRow(SSTableIdentityIterator row)
-    {
-        return getCompactedRow(Collections.singletonList(row));
-    }
-
     public void close()
     {
         SSTableReader.releaseReferences(overlappingSSTables);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
index 866907b..132cf25 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
@@ -21,6 +21,8 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 
+import com.google.common.collect.ImmutableList;
+
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.utils.CloseableIterator;
@@ -28,7 +30,6 @@ import org.apache.cassandra.utils.MergeIterator;
 
 public class CompactionIterable extends AbstractCompactionIterable
 {
-
     private static final Comparator<OnDiskAtomIterator> comparator = new Comparator<OnDiskAtomIterator>()
     {
         public int compare(OnDiskAtomIterator i1, OnDiskAtomIterator i2)
@@ -54,11 +55,11 @@ public class CompactionIterable extends AbstractCompactionIterable
 
     protected class Reducer extends MergeIterator.Reducer<OnDiskAtomIterator, AbstractCompactedRow>
     {
-        protected final List<SSTableIdentityIterator> rows = new ArrayList<SSTableIdentityIterator>();
+        protected final List<OnDiskAtomIterator> rows = new ArrayList<>();
 
         public void reduce(OnDiskAtomIterator current)
         {
-            rows.add((SSTableIdentityIterator) current);
+            rows.add(current);
         }
 
         protected AbstractCompactedRow getReduced()
@@ -71,7 +72,7 @@ public class CompactionIterable extends AbstractCompactionIterable
                 // create a new container for rows, since we're going to clear ours for the next one,
                 // and the AbstractCompactionRow code should be able to assume that the collection it receives
                 // won't be pulled out from under it.
-                return controller.getCompactedRow(new ArrayList<SSTableIdentityIterator>(rows));
+                return new LazilyCompactedRow(controller, ImmutableList.copyOf(rows));
             }
             finally
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index b14c313..966e471 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -579,7 +579,7 @@ public class CompactionManager implements CompactionManagerMBean
                     row = cleanupStrategy.cleanup(row);
                     if (row == null)
                         continue;
-                    AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
+                    AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(row));
                     if (writer.append(compactedRow) != null)
                         totalkeysWritten++;
                 }
@@ -905,7 +905,7 @@ public class CompactionManager implements CompactionManagerMBean
         }
 
         @Override
-        public boolean shouldPurge(DecoratedKey key, long delTimestamp)
+        public long maxPurgeableTimestamp(DecoratedKey key)
         {
             /*
              * The main reason we always purge is that including gcable tombstone would mean that the
@@ -918,7 +918,7 @@ public class CompactionManager implements CompactionManagerMBean
              * a tombstone that could shadow a column in another sstable, but this is doubly not a concern
              * since validation compaction is read-only.
              */
-            return true;
+            return Long.MAX_VALUE;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 1ea18e9..bb07be8 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -123,9 +123,7 @@ public class CompactionTask extends AbstractCompactionTask
         if (logger.isDebugEnabled())
             logger.debug("Expected bloom filter size : {}", keysPerSSTable);
 
-        AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
-                                      ? new ParallelCompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller)
-                                      : new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller);
+        AbstractCompactionIterable ci = new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller);
         CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
         Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 7d7c5a4..2cb014a 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -31,7 +31,6 @@ import com.google.common.collect.Iterators;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.ColumnNameHelper;
 import org.apache.cassandra.io.sstable.ColumnStats;
 import org.apache.cassandra.io.sstable.SSTable;
@@ -47,18 +46,18 @@ import org.apache.cassandra.utils.StreamingHistogram;
  * in memory at a time is the bloom filter, the index, and one column from each
  * pre-compaction row.
  */
-public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable<OnDiskAtom>
+public class LazilyCompactedRow extends AbstractCompactedRow
 {
     private final List<? extends OnDiskAtomIterator> rows;
     private final CompactionController controller;
-    private final boolean shouldPurge;
-    private ColumnFamily emptyColumnFamily;
-    private Reducer reducer;
+    private final long maxPurgeableTimestamp;
+    private final ColumnFamily emptyColumnFamily;
     private ColumnStats columnStats;
     private boolean closed;
     private ColumnIndex.Builder indexBuilder;
     private final SecondaryIndexManager.Updater indexer;
-    private long maxDelTimestamp;
+    private final Reducer reducer;
+    private final Iterator<OnDiskAtom> merger;
 
     public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
     {
@@ -67,18 +66,39 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         this.controller = controller;
         indexer = controller.cfs.indexManager.updaterFor(key);
 
-        maxDelTimestamp = Long.MIN_VALUE;
+        ColumnFamily rawCf = null;
         for (OnDiskAtomIterator row : rows)
         {
             ColumnFamily cf = row.getColumnFamily();
-            maxDelTimestamp = Math.max(maxDelTimestamp, cf.deletionInfo().maxTimestamp());
 
-            if (emptyColumnFamily == null)
-                emptyColumnFamily = cf;
+            if (rawCf == null)
+                rawCf = cf;
             else
-                emptyColumnFamily.delete(cf);
+                rawCf.delete(cf);
         }
-        this.shouldPurge = controller.shouldPurge(key, maxDelTimestamp);
+        maxPurgeableTimestamp = controller.maxPurgeableTimestamp(key);
+        // even if we can't delete all the tombstones allowed by gcBefore, we should still call removeDeleted
+        // to get rid of redundant row-level and range tombstones
+        assert rawCf != null;
+        int overriddenGcBefore = rawCf.deletionInfo().maxTimestamp() < maxPurgeableTimestamp ? controller.gcBefore : Integer.MIN_VALUE;
+        ColumnFamily purgedCf = ColumnFamilyStore.removeDeleted(rawCf, overriddenGcBefore);
+        emptyColumnFamily = purgedCf == null ? ArrayBackedSortedColumns.factory.create(controller.cfs.metadata) : purgedCf;
+
+        reducer = new Reducer();
+        merger = Iterators.filter(MergeIterator.get(rows, emptyColumnFamily.getComparator().onDiskAtomComparator, reducer), Predicates.notNull());
+    }
+
+    public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
+    {
+        // We should only gc tombstone if shouldPurge == true. But otherwise,
+        // it is still ok to collect column that shadowed by their (deleted)
+        // container, which removeDeleted(cf, Integer.MAX_VALUE) will do
+        ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf,
+                                                                 shouldPurge ? controller.gcBefore : Integer.MIN_VALUE,
+                                                                 controller.cfs.indexManager.updaterFor(key));
+        if (shouldPurge && compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
+            CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
+        return compacted;
     }
 
     public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
@@ -89,12 +109,12 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         try
         {
             indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
-            columnsIndex = indexBuilder.build(this);
+            columnsIndex = indexBuilder.buildForCompaction(merger);
             if (columnsIndex.columnsIndex.isEmpty())
             {
-                boolean cfIrrelevant = shouldPurge
-                                       ? ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, controller.gcBefore) == null
-                                       : !emptyColumnFamily.isMarkedForDelete(); // tombstones are relevant
+                boolean cfIrrelevant = emptyColumnFamily.deletionInfo().maxTimestamp() < maxPurgeableTimestamp
+                                     ? ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, controller.gcBefore) == null
+                                     : !emptyColumnFamily.isMarkedForDelete(); // tombstones are relevant
                 if (cfIrrelevant)
                     return null;
             }
@@ -104,17 +124,16 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
             throw new RuntimeException(e);
         }
         // reach into the reducer (created during iteration) to get column count, size, max column timestamp
-        // (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
-        columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
-                                      reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
-                                      reducer == null ? maxDelTimestamp : Math.max(maxDelTimestamp, reducer.maxTimestampSeen),
-                                      reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen,
-                                      reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones,
-                                      reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.minColumnNameSeen,
-                                      reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.maxColumnNameSeen
+        columnStats = new ColumnStats(reducer.columns,
+                                      reducer.minTimestampSeen,
+                                      Math.max(emptyColumnFamily.deletionInfo().maxTimestamp(), reducer.maxTimestampSeen),
+                                      reducer.maxLocalDeletionTimeSeen,
+                                      reducer.tombstones,
+                                      reducer.minColumnNameSeen,
+                                      reducer.maxColumnNameSeen
         );
-        reducer = null;
 
+        indexBuilder.maybeWriteEmptyRowHeader();
         out.writeShort(SSTableWriter.END_OF_ROW);
 
         close();
@@ -142,24 +161,11 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
 
         // initialize indexBuilder for the benefit of its tombstoneTracker, used by our reducing iterator
         indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
-        Iterator<OnDiskAtom> iter = iterator();
-        while (iter.hasNext())
-            iter.next().updateDigest(digest);
+        while (merger.hasNext())
+            merger.next().updateDigest(digest);
         close();
     }
 
-    public AbstractType<?> getComparator()
-    {
-        return emptyColumnFamily.getComparator();
-    }
-
-    public Iterator<OnDiskAtom> iterator()
-    {
-        reducer = new Reducer();
-        Iterator<OnDiskAtom> iter = MergeIterator.get(rows, getComparator().onDiskAtomComparator, reducer);
-        return Iterators.filter(iter, Predicates.notNull());
-    }
-
     public ColumnStats columnStats()
     {
         return columnStats;
@@ -239,7 +245,8 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
             }
             else
             {
-                ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller, container);
+                boolean shouldPurge = container.getSortedColumns().iterator().next().timestamp() < maxPurgeableTimestamp;
+                ColumnFamily purged = removeDeletedAndOldShards(key, shouldPurge, controller, container);
                 if (purged == null || !purged.iterator().hasNext())
                 {
                     container.clear();
@@ -248,7 +255,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
                 Column reduced = purged.iterator().next();
                 container.clear();
 
-                // PrecompactedRow.removeDeletedAndOldShards have only checked the top-level CF deletion times,
+                // removeDeletedAndOldShards have only checked the top-level CF deletion times,
                 // not the range tombstone. For that we use the columnIndexer tombstone tracker.
                 if (indexBuilder.tombstoneTracker().isDeleted(reduced))
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
deleted file mode 100644
index 8a74fea..0000000
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ /dev/null
@@ -1,403 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.compaction;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.*;
-
-import com.google.common.collect.AbstractIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
-import org.apache.cassandra.utils.*;
-
-/**
- * A class to run compaction taking advantage of multiple-core processes:
- *
- * One Deserializer thread per input sstable performs read + deserialize (a row at a time).
- * The resulting ColumnFamilies are added to a queue, which is fed to the merge Reducer.
- *
- * The merge Reducer creates MergeTasks on a thread-per-core Executor, and returns AsyncPrecompactedRow objects.
- *
- * The main complication is in handling larger-than-memory rows.  When one is encountered, no further deserialization
- * is done until that row is merged and written -- creating a pipeline stall, as it were.  Thus, this is intended
- * to be useful with mostly-in-memory row sizes, but preserves correctness in the face of occasional exceptions.
- */
-public class ParallelCompactionIterable extends AbstractCompactionIterable
-{
-    private static final Logger logger = LoggerFactory.getLogger(ParallelCompactionIterable.class);
-
-    private final int maxInMemorySize;
-
-    public ParallelCompactionIterable(OperationType type, List<ICompactionScanner> scanners, CompactionController controller)
-    {
-        this(type, scanners, controller, DatabaseDescriptor.getInMemoryCompactionLimit() / scanners.size());
-    }
-
-    public ParallelCompactionIterable(OperationType type, List<ICompactionScanner> scanners, CompactionController controller, int maxInMemorySize)
-    {
-        super(controller, type, scanners);
-        this.maxInMemorySize = maxInMemorySize;
-    }
-
-    public CloseableIterator<AbstractCompactedRow> iterator()
-    {
-        List<CloseableIterator<RowContainer>> sources = new ArrayList<CloseableIterator<RowContainer>>(scanners.size());
-        for (ICompactionScanner scanner : scanners)
-            sources.add(new Deserializer(scanner, maxInMemorySize));
-        return new Unwrapper(MergeIterator.get(sources, RowContainer.comparator, new Reducer()));
-    }
-
-    private static class Unwrapper extends AbstractIterator<AbstractCompactedRow> implements CloseableIterator<AbstractCompactedRow>
-    {
-        private final CloseableIterator<CompactedRowContainer> reducer;
-
-        public Unwrapper(CloseableIterator<CompactedRowContainer> reducer)
-        {
-            this.reducer = reducer;
-        }
-
-        protected AbstractCompactedRow computeNext()
-        {
-            if (!reducer.hasNext())
-                return endOfData();
-
-            CompactedRowContainer container = reducer.next();
-            AbstractCompactedRow compactedRow;
-            compactedRow = container.future == null
-                         ? container.row
-                         : new PrecompactedRow(container.key, FBUtilities.waitOnFuture(container.future));
-
-            return compactedRow;
-        }
-
-        public void close() throws IOException
-        {
-            reducer.close();
-        }
-    }
-
-    private class Reducer extends MergeIterator.Reducer<RowContainer, CompactedRowContainer>
-    {
-        private final List<RowContainer> rows = new ArrayList<RowContainer>();
-
-        private final ThreadPoolExecutor executor = new DebuggableThreadPoolExecutor(FBUtilities.getAvailableProcessors(),
-                                                                                     Integer.MAX_VALUE,
-                                                                                     TimeUnit.MILLISECONDS,
-                                                                                     new SynchronousQueue<Runnable>(),
-                                                                                     new NamedThreadFactory("CompactionReducer"));
-
-        public void reduce(RowContainer current)
-        {
-            rows.add(current);
-        }
-
-        protected CompactedRowContainer getReduced()
-        {
-            assert rows.size() > 0;
-
-            ParallelCompactionIterable.this.updateCounterFor(rows.size());
-            CompactedRowContainer compacted = getCompactedRow(rows);
-            rows.clear();
-            long n = 0;
-            for (ICompactionScanner scanner : scanners)
-                n += scanner.getCurrentPosition();
-            bytesRead = n;
-            return compacted;
-        }
-
-        public CompactedRowContainer getCompactedRow(List<RowContainer> rows)
-        {
-            boolean inMemory = true;
-            for (RowContainer container : rows)
-            {
-                if (container.row == null)
-                {
-                    inMemory = false;
-                    break;
-                }
-            }
-
-            if (inMemory)
-            {
-                // caller will re-use rows List, so make ourselves a copy
-                List<Row> rawRows = new ArrayList<Row>(rows.size());
-                for (RowContainer rowContainer : rows)
-                    rawRows.add(rowContainer.row);
-                return new CompactedRowContainer(rows.get(0).getKey(), executor.submit(new MergeTask(rawRows)));
-            }
-
-            List<OnDiskAtomIterator> iterators = new ArrayList<OnDiskAtomIterator>(rows.size());
-            for (RowContainer container : rows)
-                iterators.add(container.row == null ? container.wrapper : new DeserializedColumnIterator(container.row));
-            return new CompactedRowContainer(new LazilyCompactedRow(controller, iterators));
-        }
-
-        public void close()
-        {
-            executor.shutdown();
-        }
-
-        /**
-         * Merges a set of in-memory rows
-         */
-        private class MergeTask implements Callable<ColumnFamily>
-        {
-            private final List<Row> rows;
-
-            public MergeTask(List<Row> rows)
-            {
-                this.rows = rows;
-            }
-
-            public ColumnFamily call() throws Exception
-            {
-                final ColumnFamily returnCF = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata);
-
-                List<CloseableIterator<Column>> data = new ArrayList<CloseableIterator<Column>>(rows.size());
-                for (Row row : rows)
-                {
-                    returnCF.delete(row.cf);
-                    data.add(FBUtilities.closeableIterator(row.cf.iterator()));
-                }
-
-                PrecompactedRow.merge(returnCF, data, controller.cfs.indexManager.updaterFor(rows.get(0).key));
-                return PrecompactedRow.removeDeletedAndOldShards(rows.get(0).key, controller, returnCF);
-            }
-        }
-
-        private class DeserializedColumnIterator implements OnDiskAtomIterator
-        {
-            private final Row row;
-            private final Iterator<Column> iter;
-
-            public DeserializedColumnIterator(Row row)
-            {
-                this.row = row;
-                iter = row.cf.iterator();
-            }
-
-            public ColumnFamily getColumnFamily()
-            {
-                return row.cf;
-            }
-
-            public DecoratedKey getKey()
-            {
-                return row.key;
-            }
-
-            public void close() throws IOException {}
-
-            public boolean hasNext()
-            {
-                return iter.hasNext();
-            }
-
-            public OnDiskAtom next()
-            {
-                return iter.next();
-            }
-
-            public void remove()
-            {
-                throw new UnsupportedOperationException();
-            }
-        }
-    }
-
-    private static class Deserializer extends AbstractIterator<RowContainer> implements CloseableIterator<RowContainer>
-    {
-        private final LinkedBlockingQueue<RowContainer> queue = new LinkedBlockingQueue<RowContainer>(1);
-        private static final RowContainer finished = new RowContainer((Row) null);
-        private final ICompactionScanner scanner;
-
-        public Deserializer(ICompactionScanner ssts, final int maxInMemorySize)
-        {
-            this.scanner = ssts;
-            Runnable runnable = new WrappedRunnable()
-            {
-                protected void runMayThrow() throws Exception
-                {
-                    SimpleCondition condition = null;
-                    while (true)
-                    {
-                        if (condition != null)
-                        {
-                            condition.await();
-                            condition = null;
-                        }
-                        if (!scanner.hasNext())
-                        {
-                            queue.put(finished);
-                            break;
-                        }
-
-                        SSTableIdentityIterator iter = (SSTableIdentityIterator) scanner.next();
-                        if (iter.dataSize > maxInMemorySize)
-                        {
-                            logger.debug("parallel lazy deserialize from {}", iter.getPath());
-                            condition = new SimpleCondition();
-                            queue.put(new RowContainer(new NotifyingSSTableIdentityIterator(iter, condition)));
-                        }
-                        else
-                        {
-                            logger.debug("parallel eager deserialize from {}", iter.getPath());
-                            queue.put(new RowContainer(new Row(iter.getKey(), iter.getColumnFamilyWithColumns(ArrayBackedSortedColumns.factory))));
-                        }
-                    }
-                }
-            };
-            new Thread(runnable, "Deserialize " + scanner.getBackingFiles()).start();
-        }
-
-        protected RowContainer computeNext()
-        {
-            RowContainer container;
-            try
-            {
-                container = queue.take();
-            }
-            catch (InterruptedException e)
-            {
-                throw new AssertionError(e);
-            }
-            return container == finished ? endOfData() : container;
-        }
-
-        public void close() throws IOException
-        {
-            scanner.close();
-        }
-    }
-
-    /**
-     * a wrapper around SSTII that notifies the given condition when it is closed
-     */
-    private static class NotifyingSSTableIdentityIterator implements OnDiskAtomIterator
-    {
-        private final SSTableIdentityIterator wrapped;
-        private final SimpleCondition condition;
-
-        public NotifyingSSTableIdentityIterator(SSTableIdentityIterator wrapped, SimpleCondition condition)
-        {
-            this.wrapped = wrapped;
-            this.condition = condition;
-        }
-
-        public ColumnFamily getColumnFamily()
-        {
-            return wrapped.getColumnFamily();
-        }
-
-        public DecoratedKey getKey()
-        {
-            return wrapped.getKey();
-        }
-
-        public void close() throws IOException
-        {
-            try
-            {
-                wrapped.close();
-            }
-            finally
-            {
-                condition.signalAll();
-            }
-        }
-
-        public boolean hasNext()
-        {
-            return wrapped.hasNext();
-        }
-
-        public OnDiskAtom next()
-        {
-            return wrapped.next();
-        }
-
-        public void remove()
-        {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    private static class RowContainer
-    {
-        // either row is not null, or wrapper is not null.  But not both.
-        public final Row row;
-        public final NotifyingSSTableIdentityIterator wrapper;
-        public static final Comparator<RowContainer> comparator = new Comparator<RowContainer>()
-        {
-            public int compare(RowContainer o1, RowContainer o2)
-            {
-                return o1.getKey().compareTo(o2.getKey());
-            }
-        };
-
-        private RowContainer(Row row)
-        {
-            this.row = row;
-            wrapper = null;
-        }
-
-        public RowContainer(NotifyingSSTableIdentityIterator wrapper)
-        {
-            this.wrapper = wrapper;
-            row = null;
-        }
-
-        public DecoratedKey getKey()
-        {
-            return row == null ? wrapper.getKey() : row.key;
-        }
-    }
-
-    private static class CompactedRowContainer
-    {
-        public final DecoratedKey key;
-        /** either "future" or "row" will be not-null, but not both at once. */
-        public final Future<ColumnFamily> future;
-        public final LazilyCompactedRow row;
-
-        private CompactedRowContainer(DecoratedKey key, Future<ColumnFamily> future)
-        {
-            this.key = key;
-            this.future = future;
-            row = null;
-        }
-
-        private CompactedRowContainer(LazilyCompactedRow row)
-        {
-            this.row = row;
-            future = null;
-            key = null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
deleted file mode 100644
index 15ae0b8..0000000
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.compaction;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.security.MessageDigest;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.io.sstable.ColumnStats;
-import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
-import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.utils.CloseableIterator;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.MergeIterator;
-
-/**
- * PrecompactedRow merges its rows in its constructor in memory.
- */
-public class PrecompactedRow extends AbstractCompactedRow
-{
-    private final ColumnFamily compactedCf;
-
-    /** it is caller's responsibility to call removeDeleted + removeOldShards from the cf before calling this constructor */
-    public PrecompactedRow(DecoratedKey key, ColumnFamily cf)
-    {
-        super(key);
-        compactedCf = cf;
-    }
-
-    public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, CompactionController controller, ColumnFamily cf)
-    {
-        assert key != null;
-        assert controller != null;
-        assert cf != null;
-
-        // avoid calling shouldPurge unless we actually need to: it can be very expensive if LCS
-        // gets behind and has hundreds of overlapping L0 sstables.  Essentially, this method is an
-        // ugly refactor of removeDeletedAndOldShards(controller.shouldPurge(key), controller, cf),
-        // taking this into account.
-        Boolean shouldPurge = null;
-
-        if (cf.hasIrrelevantData(controller.gcBefore))
-            shouldPurge = controller.shouldPurge(key, cf.maxTimestamp());
-
-        // We should only gc tombstone if shouldPurge == true. But otherwise,
-        // it is still ok to collect column that shadowed by their (deleted)
-        // container, which removeDeleted(cf, Integer.MAX_VALUE) will do
-        ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf, shouldPurge != null && shouldPurge ? controller.gcBefore : Integer.MIN_VALUE);
-
-        if (compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
-        {
-            if (shouldPurge == null)
-                shouldPurge = controller.shouldPurge(key, cf.deletionInfo().maxTimestamp());
-            if (shouldPurge)
-                CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
-        }
-
-        return compacted;
-    }
-
-    public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
-    {
-        // See comment in preceding method
-        ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf,
-                                                                 shouldPurge ? controller.gcBefore : Integer.MIN_VALUE,
-                                                                 controller.cfs.indexManager.updaterFor(key));
-        if (shouldPurge && compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
-            CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
-        return compacted;
-    }
-
-    public PrecompactedRow(CompactionController controller, List<SSTableIdentityIterator> rows)
-    {
-        this(rows.get(0).getKey(),
-             removeDeletedAndOldShards(rows.get(0).getKey(), controller, merge(rows, controller)));
-    }
-
-    private static ColumnFamily merge(List<SSTableIdentityIterator> rows, CompactionController controller)
-    {
-        assert !rows.isEmpty();
-
-        final ColumnFamily returnCF = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata);
-
-        // transform into iterators that MergeIterator will like, and apply row-level tombstones
-        List<CloseableIterator<Column>> data = new ArrayList<>(rows.size());
-        for (SSTableIdentityIterator row : rows)
-        {
-            ColumnFamily cf = row.getColumnFamilyWithColumns(ArrayBackedSortedColumns.factory);
-            returnCF.delete(cf);
-            data.add(FBUtilities.closeableIterator(cf.iterator()));
-        }
-
-        merge(returnCF, data, controller.cfs.indexManager.updaterFor(rows.get(0).getKey()));
-
-        return returnCF;
-    }
-
-    // returnCF should already have row-level tombstones applied
-    public static void merge(final ColumnFamily returnCF, List<CloseableIterator<Column>> data, final SecondaryIndexManager.Updater indexer)
-    {
-        IDiskAtomFilter filter = new IdentityQueryFilter();
-        Comparator<Column> fcomp = filter.getColumnComparator(returnCF.getComparator());
-
-        MergeIterator.Reducer<Column, Column> reducer = new MergeIterator.Reducer<Column, Column>()
-        {
-            ColumnFamily container = returnCF.cloneMeShallow();
-
-            public void reduce(Column column)
-            {
-                container.addColumn(column);
-
-                // skip the index-update checks if there is no indexing needed since they are a bit expensive
-                if (indexer == SecondaryIndexManager.nullUpdater)
-                    return;
-
-                // notify the index that the column has been overwritten if the value being reduced has been
-                // superceded by another directly, or indirectly by a range tombstone
-                if ((!column.isMarkedForDelete(System.currentTimeMillis()) && !container.getColumn(column.name()).equals(column))
-                    || returnCF.deletionInfo().isDeleted(column.name(), CompactionManager.NO_GC))
-                {
-                    indexer.remove(column);
-                }
-            }
-
-            protected Column getReduced()
-            {
-                Column c = container.iterator().next();
-                container.clear();
-                return c;
-            }
-        };
-
-        Iterator<Column> reduced = MergeIterator.get(data, fcomp, reducer);
-        filter.collectReducedColumns(returnCF, reduced, CompactionManager.NO_GC, System.currentTimeMillis());
-    }
-
-    public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
-    {
-        if (compactedCf == null)
-            return null;
-
-        return SSTableWriter.rawAppend(compactedCf, currentPosition, key, out);
-    }
-
-    public void update(MessageDigest digest)
-    {
-        assert compactedCf != null;
-        DataOutputBuffer buffer = new DataOutputBuffer();
-        try
-        {
-            DeletionTime.serializer.serialize(compactedCf.deletionInfo().getTopLevelDeletion(), buffer);
-            digest.update(buffer.getData(), 0, buffer.getLength());
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        compactedCf.updateDigest(digest);
-    }
-
-    public ColumnStats columnStats()
-    {
-        return compactedCf.getColumnStats();
-    }
-
-    /**
-     * @return the full column family represented by this compacted row.
-     *
-     * We do not provide this method for other AbstractCompactedRow, because this fits the whole row into
-     * memory and don't make sense for those other implementations.
-     */
-    public ColumnFamily getFullColumnFamily()
-    {
-        return compactedCf;
-    }
-
-    public void close() { }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
index d0aafa4..a14ab43 100644
--- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@ -96,9 +96,9 @@ public class SSTableSplitter {
         }
 
         @Override
-        public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp)
+        public long maxPurgeableTimestamp(DecoratedKey key)
         {
-            return false;
+            return Long.MIN_VALUE;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index e435c24..97daff4 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -54,14 +54,14 @@ public class Scrubber implements Closeable
 
     private final OutputHandler outputHandler;
 
-    private static final Comparator<AbstractCompactedRow> acrComparator = new Comparator<AbstractCompactedRow>()
+    private static final Comparator<Row> rowComparator = new Comparator<Row>()
     {
-         public int compare(AbstractCompactedRow r1, AbstractCompactedRow r2)
+         public int compare(Row r1, Row r2)
          {
              return r1.key.compareTo(r2.key);
          }
     };
-    private final Set<AbstractCompactedRow> outOfOrderRows = new TreeSet<AbstractCompactedRow>(acrComparator);
+    private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator);
 
     public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException
     {
@@ -100,7 +100,7 @@ public class Scrubber implements Closeable
 
     public void scrub()
     {
-        outputHandler.output("Scrubbing " + sstable);
+        outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
         try
         {
             ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
@@ -113,7 +113,7 @@ public class Scrubber implements Closeable
             // TODO errors when creating the writer may leave empty temp files.
             writer = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
 
-            AbstractCompactedRow prevRow = null;
+            DecoratedKey prevKey = null;
 
             while (!dataFile.isEOF())
             {
@@ -184,20 +184,19 @@ public class Scrubber implements Closeable
                     if (dataSize > dataFile.length())
                         throw new IOError(new IOException("Impossible row size " + dataSize));
 
-                    SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
-                    AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
-                    if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
+                    SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
+                    if (prevKey != null && prevKey.compareTo(key) > 0)
                     {
-                        outOfOrderRows.add(compactedRow);
-                        outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+                        saveOutOfOrderRow(prevKey, key, atoms);
                         continue;
                     }
 
+                    AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
                     if (writer.append(compactedRow) == null)
                         emptyRows++;
                     else
                         goodRows++;
-                    prevRow = compactedRow;
+                    prevKey = key;
                     if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
                         outputHandler.warn("Index file contained a different key or row size; using key from data file");
                 }
@@ -215,19 +214,19 @@ public class Scrubber implements Closeable
                         key = sstable.partitioner.decorateKey(currentIndexKey);
                         try
                         {
-                            SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSizeFromIndex, true);
-                            AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
-                            if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
+                            SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
+                            if (prevKey != null && prevKey.compareTo(key) > 0)
                             {
-                                outOfOrderRows.add(compactedRow);
-                                outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+                                saveOutOfOrderRow(prevKey, key, atoms);
                                 continue;
                             }
+
+                            AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
                             if (writer.append(compactedRow) == null)
                                 emptyRows++;
                             else
                                 goodRows++;
-                            prevRow = compactedRow;
+                            prevKey = key;
                         }
                         catch (Throwable th2)
                         {
@@ -273,8 +272,8 @@ public class Scrubber implements Closeable
         if (!outOfOrderRows.isEmpty())
         {
             SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
-            for (AbstractCompactedRow row : outOfOrderRows)
-                inOrderWriter.append(row);
+            for (Row row : outOfOrderRows)
+                inOrderWriter.append(row.key, row.cf);
             newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
             outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable));
         }
@@ -294,6 +293,21 @@ public class Scrubber implements Closeable
         }
     }
 
+    private void saveOutOfOrderRow(DecoratedKey prevKey, DecoratedKey key, SSTableIdentityIterator atoms)
+    {
+        // TODO bitch if the row is too large?  if it is there's not much we can do ...
+        outputHandler.warn(String.format("Out of order row detected (%s found after %s)", key, prevKey));
+        // adding atoms in sorted order is worst-case for TMBSC, but we shouldn't need to do this very often
+        // and there's no sense in failing on mis-sorted cells when a TreeMap could safe us
+        ColumnFamily cf = atoms.getColumnFamily().cloneMeShallow(TreeMapBackedSortedColumns.factory, false);
+        while (atoms.hasNext())
+        {
+            OnDiskAtom atom = atoms.next();
+            cf.addAtom(atom);
+        }
+        outOfOrderRows.add(new Row(key, cf));
+    }
+
     public SSTableReader getNewSSTable()
     {
         return newSstable;
@@ -356,9 +370,9 @@ public class Scrubber implements Closeable
         }
 
         @Override
-        public boolean shouldPurge(DecoratedKey key, long delTimestamp)
+        public long maxPurgeableTimestamp(DecoratedKey key)
         {
-            return false;
+            return Long.MIN_VALUE;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index fa21765..b98c2ae 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -151,9 +151,9 @@ public class Upgrader
         }
 
         @Override
-        public boolean shouldPurge(DecoratedKey key, long delTimestamp)
+        public long maxPurgeableTimestamp(DecoratedKey key)
         {
-            return false;
+            return Long.MIN_VALUE;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index 1fa2912..e7b9e11 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -71,7 +71,7 @@ public class IndexSummaryBuilder
 
     public IndexSummary build(IPartitioner partitioner)
     {
-        assert keys != null && keys.size() > 0;
+        assert keys.size() > 0;
         assert keys.size() == positions.size();
 
         Memory memory = Memory.allocate(offheapSize + (keys.size() * 4));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index ac598bd..70c0b42 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -259,7 +259,7 @@ public class SSTableWriter extends SSTable
                 columnIndexer.add(atom); // This write the atom on disk too
             }
 
-            columnIndexer.finish();
+            columnIndexer.maybeWriteEmptyRowHeader();
             dataFile.stream.writeShort(END_OF_ROW);
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/test/data/serialization/2.0/db.RowMutation.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/db.RowMutation.bin b/test/data/serialization/2.0/db.RowMutation.bin
index d6abf2d..73d93e8 100644
Binary files a/test/data/serialization/2.0/db.RowMutation.bin and b/test/data/serialization/2.0/db.RowMutation.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 48c0b3c..18e637b 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -39,6 +39,8 @@ import org.apache.cassandra.Util;
 
 import static org.junit.Assert.assertEquals;
 import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
+import static org.junit.Assert.assertFalse;
+
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 
@@ -144,45 +146,50 @@ public class CompactionsPurgeTest extends SchemaLoader
         // verify that minor compaction still GC when key is present
         // in a non-compacted sstable but the timestamp ensures we won't miss anything
         cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key1, cfName, System.currentTimeMillis()));
-        Assert.assertEquals(1, cf.getColumnCount());
+        assertEquals(1, cf.getColumnCount());
     }
 
+    /**
+     * verify that we don't drop tombstones during a minor compaction that might still be relevant
+     */
     @Test
     public void testMinTimestampPurge() throws IOException, ExecutionException, InterruptedException
     {
-        // verify that we don't drop tombstones during a minor compaction that might still be relevant
         CompactionManager.instance.disableAutoCompaction();
+
         Keyspace keyspace = Keyspace.open(KEYSPACE2);
         String cfName = "Standard1";
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
-
         RowMutation rm;
         DecoratedKey key3 = Util.dk("key3");
+
         // inserts
         rm = new RowMutation(KEYSPACE2, key3.key);
         rm.add(cfName, ByteBufferUtil.bytes("c1"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 8);
         rm.add(cfName, ByteBufferUtil.bytes("c2"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 8);
         rm.apply();
         cfs.forceBlockingFlush();
-        // deletes
+        // delete c1
         rm = new RowMutation(KEYSPACE2, key3.key);
         rm.delete(cfName, ByteBufferUtil.bytes("c1"), 10);
         rm.apply();
         cfs.forceBlockingFlush();
         Collection<SSTableReader> sstablesIncomplete = cfs.getSSTables();
 
-        // delete so we have new delete in a diffrent SST.
+        // delete c2 so we have new delete in a diffrent SSTable
         rm = new RowMutation(KEYSPACE2, key3.key);
         rm.delete(cfName, ByteBufferUtil.bytes("c2"), 9);
         rm.apply();
         cfs.forceBlockingFlush();
+
+        // compact the sstables with the c1/c2 data and the c1 tombstone
         cfs.getCompactionStrategy().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null);
 
-        // we should have both the c1 and c2 tombstones still, since the c2 timestamp is older than the c1 tombstone
-        // so it would be invalid to assume we can throw out the c1 entry.
+        // We should have both the c1 and c2 tombstones still. Since the min timestamp in the c2 tombstone
+        // sstable is older than the c1 tombstone, it is invalid to throw out the c1 tombstone.
         ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key3, cfName, System.currentTimeMillis()));
-        Assert.assertFalse(cf.getColumn(ByteBufferUtil.bytes("c2")).isLive(System.currentTimeMillis()));
-        Assert.assertEquals(2, cf.getColumnCount());
+        assertFalse(cf.getColumn(ByteBufferUtil.bytes("c2")).isLive(System.currentTimeMillis()));
+        assertEquals(2, cf.getColumnCount());
     }
 
     @Test
@@ -279,16 +286,13 @@ public class CompactionsPurgeTest extends SchemaLoader
         String cfName = "Standard1";
         Keyspace keyspace = Keyspace.open(keyspaceName);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
-
         DecoratedKey key = Util.dk("key3");
         RowMutation rm;
 
         // inserts
         rm = new RowMutation(keyspaceName, key.key);
         for (int i = 0; i < 10; i++)
-        {
             rm.add(cfName, ByteBufferUtil.bytes(String.valueOf(i)), ByteBufferUtil.EMPTY_BYTE_BUFFER, i);
-        }
         rm.apply();
 
         // deletes row with timestamp such that not all columns are deleted
@@ -303,12 +307,10 @@ public class CompactionsPurgeTest extends SchemaLoader
         // re-inserts with timestamp lower than delete
         rm = new RowMutation(keyspaceName, key.key);
         for (int i = 0; i < 5; i++)
-        {
             rm.add(cfName, ByteBufferUtil.bytes(String.valueOf(i)), ByteBufferUtil.EMPTY_BYTE_BUFFER, i);
-        }
         rm.apply();
 
-        // Check that the second insert did went in
+        // Check that the second insert went in
         ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
         assertEquals(10, cf.getColumnCount());
         for (Column c : cf)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
deleted file mode 100644
index 1c286c8..0000000
--- a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.io;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-
-import com.google.common.base.Objects;
-import org.junit.Test;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.compaction.*;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.MappedFileDataInput;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.CloseableIterator;
-
-import static org.junit.Assert.assertEquals;
-
-
-public class LazilyCompactedRowTest extends SchemaLoader
-{
-    private static void assertBytes(ColumnFamilyStore cfs, int gcBefore) throws IOException
-    {
-        AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
-        Collection<SSTableReader> sstables = cfs.getSSTables();
-
-        // compare eager and lazy compactions
-        AbstractCompactionIterable eager = new CompactionIterable(OperationType.UNKNOWN,
-                                                                  strategy.getScanners(sstables),
-                                                                  new PreCompactingController(cfs, sstables, gcBefore));
-        AbstractCompactionIterable lazy = new CompactionIterable(OperationType.UNKNOWN,
-                                                                 strategy.getScanners(sstables),
-                                                                 new LazilyCompactingController(cfs, sstables, gcBefore));
-        assertBytes(cfs, eager, lazy);
-
-        // compare eager and parallel-lazy compactions
-        eager = new CompactionIterable(OperationType.UNKNOWN,
-                                       strategy.getScanners(sstables),
-                                       new PreCompactingController(cfs, sstables, gcBefore));
-        AbstractCompactionIterable parallel = new ParallelCompactionIterable(OperationType.UNKNOWN,
-                                                                             strategy.getScanners(sstables),
-                                                                             new CompactionController(cfs, new HashSet<SSTableReader>(sstables), gcBefore),
-                                                                             0);
-        assertBytes(cfs, eager, parallel);
-    }
-
-    private static void assertBytes(ColumnFamilyStore cfs, AbstractCompactionIterable ci1, AbstractCompactionIterable ci2) throws IOException
-    {
-        CloseableIterator<AbstractCompactedRow> iter1 = ci1.iterator();
-        CloseableIterator<AbstractCompactedRow> iter2 = ci2.iterator();
-
-        while (true)
-        {
-            if (!iter1.hasNext())
-            {
-                assert !iter2.hasNext();
-                break;
-            }
-
-            AbstractCompactedRow row1 = iter1.next();
-            AbstractCompactedRow row2 = iter2.next();
-            DataOutputBuffer out1 = new DataOutputBuffer();
-            DataOutputBuffer out2 = new DataOutputBuffer();
-            row1.write(-1, out1);
-            row2.write(-1, out2);
-
-            File tmpFile1 = File.createTempFile("lcrt1", null);
-            File tmpFile2 = File.createTempFile("lcrt2", null);
-
-            tmpFile1.deleteOnExit();
-            tmpFile2.deleteOnExit();
-
-            try (OutputStream os1 = new FileOutputStream(tmpFile1);
-                 OutputStream os2 = new FileOutputStream(tmpFile2))
-            {
-                os1.write(out1.getData()); // writing data from row1
-                os2.write(out2.getData()); // writing data from row2
-            }
-
-            try (MappedFileDataInput in1 = new MappedFileDataInput(new FileInputStream(tmpFile1), tmpFile1.getAbsolutePath(), 0, 0);
-                 MappedFileDataInput in2 = new MappedFileDataInput(new FileInputStream(tmpFile2), tmpFile2.getAbsolutePath(), 0, 0))
-            {
-                // row key
-                assertEquals(ByteBufferUtil.readWithShortLength(in1), ByteBufferUtil.readWithShortLength(in2));
-    
-                // cf metadata
-                ColumnFamily cf1 = TreeMapBackedSortedColumns.factory.create(cfs.metadata);
-                ColumnFamily cf2 = TreeMapBackedSortedColumns.factory.create(cfs.metadata);
-                cf1.delete(DeletionTime.serializer.deserialize(in1));
-                cf2.delete(DeletionTime.serializer.deserialize(in2));
-                assertEquals(cf1.deletionInfo(), cf2.deletionInfo());
-                // columns
-                while (true)
-                {
-                    Column c1 = (Column)Column.onDiskSerializer().deserializeFromSSTable(in1, Descriptor.Version.CURRENT);
-                    Column c2 = (Column)Column.onDiskSerializer().deserializeFromSSTable(in2, Descriptor.Version.CURRENT);
-                    assert Objects.equal(c1, c2) : c1.getString(cfs.metadata.comparator) + " != " + c2.getString(cfs.metadata.comparator);
-                    if (c1 == null)
-                        break;
-                }
-                // that should be everything
-                assert in1.available() == 0;
-                assert in2.available() == 0;
-            }
-        }
-    }
-
-    private void assertDigest(ColumnFamilyStore cfs, int gcBefore) throws NoSuchAlgorithmException
-    {
-        AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
-        Collection<SSTableReader> sstables = cfs.getSSTables();
-        AbstractCompactionIterable ci1 = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new PreCompactingController(cfs, sstables, gcBefore));
-        AbstractCompactionIterable ci2 = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new LazilyCompactingController(cfs, sstables, gcBefore));
-        CloseableIterator<AbstractCompactedRow> iter1 = ci1.iterator();
-        CloseableIterator<AbstractCompactedRow> iter2 = ci2.iterator();
-
-        while (true)
-        {
-            if (!iter1.hasNext())
-            {
-                assert !iter2.hasNext();
-                break;
-            }
-
-            AbstractCompactedRow row1 = iter1.next();
-            AbstractCompactedRow row2 = iter2.next();
-            MessageDigest digest1 = MessageDigest.getInstance("MD5");
-            MessageDigest digest2 = MessageDigest.getInstance("MD5");
-
-            row1.update(digest1);
-            row2.update(digest2);
-
-            assert MessageDigest.isEqual(digest1.digest(), digest2.digest());
-        }
-    }
-
-    @Test
-    public void testOneRow() throws IOException, NoSuchAlgorithmException
-    {
-        CompactionManager.instance.disableAutoCompaction();
-
-        Keyspace keyspace = Keyspace.open("Keyspace1");
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
-
-        ByteBuffer key = ByteBufferUtil.bytes("k");
-        RowMutation rm = new RowMutation("Keyspace1", key);
-        rm.add("Standard1", ByteBufferUtil.bytes("c"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
-        rm.apply();
-        cfs.forceBlockingFlush();
-
-        assertBytes(cfs, Integer.MAX_VALUE);
-        assertDigest(cfs, Integer.MAX_VALUE);
-    }
-
-    @Test
-    public void testOneRowTwoColumns() throws IOException, NoSuchAlgorithmException
-    {
-        CompactionManager.instance.disableAutoCompaction();
-
-        Keyspace keyspace = Keyspace.open("Keyspace1");
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
-
-        ByteBuffer key = ByteBufferUtil.bytes("k");
-        RowMutation rm = new RowMutation("Keyspace1", key);
-        rm.add("Standard1", ByteBufferUtil.bytes("c"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
-        rm.add("Standard1", ByteBufferUtil.bytes("d"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
-        rm.apply();
-        cfs.forceBlockingFlush();
-
-        assertBytes(cfs, Integer.MAX_VALUE);
-        assertDigest(cfs, Integer.MAX_VALUE);
-    }
-
-    @Test
-    public void testOneRowManyColumns() throws IOException, NoSuchAlgorithmException
-    {
-        CompactionManager.instance.disableAutoCompaction();
-
-        Keyspace keyspace = Keyspace.open("Keyspace1");
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
-
-        ByteBuffer key = ByteBuffer.wrap("k".getBytes());
-        RowMutation rm = new RowMutation("Keyspace1", key);
-        for (int i = 0; i < 1000; i++)
-            rm.add("Standard1", ByteBufferUtil.bytes(i), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
-        rm.apply();
-        DataOutputBuffer out = new DataOutputBuffer();
-        RowMutation.serializer.serialize(rm, out, MessagingService.current_version);
-        assert out.getLength() > DatabaseDescriptor.getColumnIndexSize();
-        cfs.forceBlockingFlush();
-
-        assertBytes(cfs, Integer.MAX_VALUE);
-        assertDigest(cfs, Integer.MAX_VALUE);
-    }
-
-    @Test
-    public void testTwoRows() throws IOException, NoSuchAlgorithmException
-    {
-        CompactionManager.instance.disableAutoCompaction();
-
-        Keyspace keyspace = Keyspace.open("Keyspace1");
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
-
-        ByteBuffer key = ByteBufferUtil.bytes("k");
-        RowMutation rm = new RowMutation("Keyspace1", key);
-        rm.add("Standard1", ByteBufferUtil.bytes("c"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
-        rm.apply();
-        cfs.forceBlockingFlush();
-
-        rm.apply();
-        cfs.forceBlockingFlush();
-
-        assertBytes(cfs, Integer.MAX_VALUE);
-        assertDigest(cfs, Integer.MAX_VALUE);
-    }
-
-    @Test
-    public void testTwoRowsTwoColumns() throws IOException, NoSuchAlgorithmException
-    {
-        CompactionManager.instance.disableAutoCompaction();
-
-        Keyspace keyspace = Keyspace.open("Keyspace1");
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
-
-        ByteBuffer key = ByteBufferUtil.bytes("k");
-        RowMutation rm = new RowMutation("Keyspace1", key);
-        rm.add("Standard1", ByteBufferUtil.bytes("c"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
-        rm.add("Standard1", ByteBufferUtil.bytes("d"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
-        rm.apply();
-        cfs.forceBlockingFlush();
-
-        rm.apply();
-        cfs.forceBlockingFlush();
-
-        assertBytes(cfs, Integer.MAX_VALUE);
-        assertDigest(cfs, Integer.MAX_VALUE);
-    }
-
-    @Test
-    public void testManyRows() throws IOException, NoSuchAlgorithmException
-    {
-        CompactionManager.instance.disableAutoCompaction();
-
-        Keyspace keyspace = Keyspace.open("Keyspace1");
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
-
-        final int ROWS_PER_SSTABLE = 10;
-        for (int j = 0; j < (cfs.metadata.getIndexInterval() * 3) / ROWS_PER_SSTABLE; j++)
-        {
-            for (int i = 0; i < ROWS_PER_SSTABLE; i++)
-            {
-                ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(i % 2));
-                RowMutation rm = new RowMutation("Keyspace1", key);
-                rm.add("Standard1", ByteBufferUtil.bytes(String.valueOf(i / 2)), ByteBufferUtil.EMPTY_BYTE_BUFFER, j * ROWS_PER_SSTABLE + i);
-                rm.apply();
-            }
-            cfs.forceBlockingFlush();
-        }
-
-        assertBytes(cfs, Integer.MAX_VALUE);
-        assertDigest(cfs, Integer.MAX_VALUE);
-    }
-
-    private static class LazilyCompactingController extends CompactionController
-    {
-        public LazilyCompactingController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore)
-        {
-            super(cfs, new HashSet<SSTableReader>(sstables), gcBefore);
-        }
-
-        @Override
-        public AbstractCompactedRow getCompactedRow(List<SSTableIdentityIterator> rows)
-        {
-            return new LazilyCompactedRow(this, rows);
-        }
-    }
-
-    private static class PreCompactingController extends CompactionController
-    {
-        public PreCompactingController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore)
-        {
-            super(cfs, new HashSet<SSTableReader>(sstables), gcBefore);
-        }
-
-        @Override
-        public AbstractCompactedRow getCompactedRow(List<SSTableIdentityIterator> rows)
-        {
-            return new PrecompactedRow(this, rows);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index 9fa5d89..757254c 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -17,7 +17,10 @@
  */
 package org.apache.cassandra.repair;
 
+import java.io.DataOutput;
+import java.io.IOException;
 import java.net.InetAddress;
+import java.security.MessageDigest;
 import java.util.UUID;
 
 import org.junit.After;
@@ -27,11 +30,12 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.TreeMapBackedSortedColumns;
-import org.apache.cassandra.db.compaction.PrecompactedRow;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.ColumnStats;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -43,11 +47,7 @@ import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.SimpleCondition;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 public class ValidatorTest extends SchemaLoader
 {
@@ -109,8 +109,7 @@ public class ValidatorTest extends SchemaLoader
 
         // add a row
         Token mid = partitioner.midpoint(range.left, range.right);
-        validator.add(new PrecompactedRow(new DecoratedKey(mid, ByteBufferUtil.bytes("inconceivable!")),
-                                                 TreeMapBackedSortedColumns.factory.create(cfs.metadata)));
+        validator.add(new CompactedRowStub(new DecoratedKey(mid, ByteBufferUtil.bytes("inconceivable!"))));
         validator.complete();
 
         // confirm that the tree was validated
@@ -121,6 +120,28 @@ public class ValidatorTest extends SchemaLoader
             lock.await();
     }
 
+    private static class CompactedRowStub extends AbstractCompactedRow
+    {
+        private CompactedRowStub(DecoratedKey key)
+        {
+            super(key);
+        }
+
+        public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public void update(MessageDigest digest) { }
+
+        public ColumnStats columnStats()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public void close() throws IOException { }
+    }
+
     @Test
     public void testValidatorFailed() throws Throwable
     {