You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/10/30 16:18:29 UTC

[01/10] git commit: update COLD_READS_TO_OMIT_KEY

Updated Branches:
  refs/heads/cassandra-2.0 7187a8af0 -> d4b5b0dbc
  refs/heads/trunk ba3c1bcb9 -> d39ec221b


update COLD_READS_TO_OMIT_KEY


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f3a75035
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f3a75035
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f3a75035

Branch: refs/heads/trunk
Commit: f3a75035ac105d032988d8b26e562458ac469699
Parents: 9a94494
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Oct 29 15:39:30 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Oct 29 15:41:17 2013 -0500

----------------------------------------------------------------------
 .../compaction/SizeTieredCompactionStrategyOptions.java | 10 +++++-----
 .../db/compaction/SizeTieredCompactionStrategyTest.java | 12 ++++++------
 2 files changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3a75035/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
index 711ec6e..c6c5f1b 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
@@ -30,7 +30,7 @@ public final class SizeTieredCompactionStrategyOptions
     protected static final String MIN_SSTABLE_SIZE_KEY = "min_sstable_size";
     protected static final String BUCKET_LOW_KEY = "bucket_low";
     protected static final String BUCKET_HIGH_KEY = "bucket_high";
-    protected static final String MAX_COLD_READS_RATIO_KEY = "max_cold_reads_ratio";
+    protected static final String COLD_READS_TO_OMIT_KEY = "cold_reads_to_omit";
 
     protected long minSSTableSize;
     protected double bucketLow;
@@ -45,7 +45,7 @@ public final class SizeTieredCompactionStrategyOptions
         bucketLow = optionValue == null ? DEFAULT_BUCKET_LOW : Double.parseDouble(optionValue);
         optionValue = options.get(BUCKET_HIGH_KEY);
         bucketHigh = optionValue == null ? DEFAULT_BUCKET_HIGH : Double.parseDouble(optionValue);
-        optionValue = options.get(MAX_COLD_READS_RATIO_KEY);
+        optionValue = options.get(COLD_READS_TO_OMIT_KEY);
         coldReadsToOmit = optionValue == null ? DEFAULT_COLD_READS_TO_OMIT : Double.parseDouble(optionValue);
     }
 
@@ -94,17 +94,17 @@ public final class SizeTieredCompactionStrategyOptions
                                                            BUCKET_HIGH_KEY, bucketHigh, BUCKET_LOW_KEY, bucketLow));
         }
 
-        double maxColdReadsRatio = parseDouble(options, MAX_COLD_READS_RATIO_KEY, DEFAULT_COLD_READS_TO_OMIT);
+        double maxColdReadsRatio = parseDouble(options, COLD_READS_TO_OMIT_KEY, DEFAULT_COLD_READS_TO_OMIT);
         if (maxColdReadsRatio < 0.0 || maxColdReadsRatio > 1.0)
         {
             throw new ConfigurationException(String.format("%s value (%s) should be between between 0.0 and 1.0",
-                                                           MAX_COLD_READS_RATIO_KEY, optionValue));
+                                                           COLD_READS_TO_OMIT_KEY, optionValue));
         }
 
         uncheckedOptions.remove(MIN_SSTABLE_SIZE_KEY);
         uncheckedOptions.remove(BUCKET_LOW_KEY);
         uncheckedOptions.remove(BUCKET_HIGH_KEY);
-        uncheckedOptions.remove(MAX_COLD_READS_RATIO_KEY);
+        uncheckedOptions.remove(COLD_READS_TO_OMIT_KEY);
 
         return uncheckedOptions;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3a75035/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
index 5e79bd8..6bfa4e8 100644
--- a/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
@@ -49,7 +49,7 @@ public class SizeTieredCompactionStrategyTest extends SchemaLoader
     public void testOptionsValidation() throws ConfigurationException
     {
         Map<String, String> options = new HashMap<>();
-        options.put(SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY, "0.35");
+        options.put(SizeTieredCompactionStrategyOptions.COLD_READS_TO_OMIT_KEY, "0.35");
         options.put(SizeTieredCompactionStrategyOptions.BUCKET_LOW_KEY, "0.5");
         options.put(SizeTieredCompactionStrategyOptions.BUCKET_HIGH_KEY, "1.5");
         options.put(SizeTieredCompactionStrategyOptions.MIN_SSTABLE_SIZE_KEY, "10000");
@@ -58,21 +58,21 @@ public class SizeTieredCompactionStrategyTest extends SchemaLoader
 
         try
         {
-            options.put(SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY, "-0.5");
+            options.put(SizeTieredCompactionStrategyOptions.COLD_READS_TO_OMIT_KEY, "-0.5");
             validateOptions(options);
-            fail(String.format("Negative %s should be rejected", SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY));
+            fail(String.format("Negative %s should be rejected", SizeTieredCompactionStrategyOptions.COLD_READS_TO_OMIT_KEY));
         }
         catch (ConfigurationException e) {}
 
         try
         {
-            options.put(SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY, "10.0");
+            options.put(SizeTieredCompactionStrategyOptions.COLD_READS_TO_OMIT_KEY, "10.0");
             validateOptions(options);
-            fail(String.format("%s > 1.0 should be rejected", SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY));
+            fail(String.format("%s > 1.0 should be rejected", SizeTieredCompactionStrategyOptions.COLD_READS_TO_OMIT_KEY));
         }
         catch (ConfigurationException e)
         {
-            options.put(SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY, "0.25");
+            options.put(SizeTieredCompactionStrategyOptions.COLD_READS_TO_OMIT_KEY, "0.25");
         }
 
         try


[04/10] git commit: Remove CHANGES.txt dups

Posted by jb...@apache.org.
Remove CHANGES.txt dups


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0a38dd52
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0a38dd52
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0a38dd52

Branch: refs/heads/trunk
Commit: 0a38dd52f284ee75b19850fc98c2e77df6141817
Parents: f3a7503
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Oct 30 00:16:18 2013 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Oct 30 00:16:58 2013 +0300

----------------------------------------------------------------------
 CHANGES.txt | 2 --
 1 file changed, 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a38dd52/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4815c1c..e73a2b5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -81,8 +81,6 @@ Merged from 1.2:
  * Rework token replacement to use replace_address (CASSANDRA-5916)
  * Fix altering column types (CASSANDRA-6185)
  * cqlsh: fix CREATE/ALTER WITH completion (CASSANDRA-6196)
- * Fix altering column types (CASSANDRA-6185)
- * cqlsh: fix CREATE/ALTER WITH completion (CASSANDRA-6196)
  * add windows bat files for shell commands (CASSANDRA-6145)
  * Fix potential stack overflow during range tombstones insertion (CASSANDRA-6181)
  * (Hadoop) Make LOCAL_ONE the default consistency level (CASSANDRA-6214)


[07/10] git commit: Fixes for compacting larger-than-memory rows patch by jbellis; reviewed by marcuse for CASSANDRA-6274

Posted by jb...@apache.org.
Fixes for compacting larger-than-memory rows
patch by jbellis; reviewed by marcuse for CASSANDRA-6274


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d4b5b0db
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d4b5b0db
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d4b5b0db

Branch: refs/heads/trunk
Commit: d4b5b0dbc541b4b6249ccd1b507c777d7fc0bc4f
Parents: 7187a8a
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 30 10:14:15 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Oct 30 10:14:15 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/ColumnIndex.java    |  22 ++++----
 .../db/compaction/AbstractCompactedRow.java     |   4 +-
 .../db/compaction/CompactionController.java     |   2 +-
 .../db/compaction/LazilyCompactedRow.java       |  34 ++++++++----
 .../cassandra/db/compaction/Scrubber.java       |  52 ++++++++++++-------
 .../cassandra/io/sstable/SSTableWriter.java     |   2 +-
 test/data/serialization/2.0/db.RowMutation.bin  | Bin 3599 -> 3599 bytes
 8 files changed, 74 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e73a2b5..31b944c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.3
+ * Fixes for compacting larger-than-memory rows (CASSANDRA-6274)
  * Compact hottest sstables first and optionally omit coldest from
    compaction entirely (CASSANDRA-6109)
  * Fix modifying column_metadata from thrift (CASSANDRA-6182)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index a5e0447..75a06d7 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -140,20 +140,22 @@ public class ColumnIndex
             }
             ColumnIndex index = build();
 
-            finish();
+            maybeWriteEmptyRowHeader();
 
             return index;
         }
 
-        public ColumnIndex build(Iterable<OnDiskAtom> columns) throws IOException
+        /**
+         * The important distinction wrt build() is that we may be building for a row that ends up
+         * being compacted away entirely, i.e., the input consists only of expired tombstones (or
+         * columns shadowed by expired tombstone).  Thus, it is the caller's responsibility
+         * to decide whether to write the header for an empty row.
+         */
+        public ColumnIndex buildForCompaction(Iterator<OnDiskAtom> columns) throws IOException
         {
-            for (OnDiskAtom c : columns)
-                add(c);
-            ColumnIndex index = build();
-
-            finish();
-
-            return index;
+            while (columns.hasNext())
+                add(columns.next());
+            return build();
         }
 
         public void add(OnDiskAtom column) throws IOException
@@ -219,7 +221,7 @@ public class ColumnIndex
             return result;
         }
 
-        public void finish() throws IOException
+        public void maybeWriteEmptyRowHeader() throws IOException
         {
             if (!deletionInfo.isLive())
                 maybeWriteRowHeader();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
index 966770f..734155e 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
@@ -42,10 +42,10 @@ public abstract class AbstractCompactedRow implements Closeable
 
     /**
      * write the row (size + column index + filter + column data, but NOT row key) to @param out.
-     * It is an error to call this if isEmpty is false.  (Because the key is appended first,
-     * so we'd have an incomplete row written.)
      *
      * write() may change internal state; it is NOT valid to call write() or update() a second time.
+     *
+     * @return index information for the written row, or null if the compaction resulted in only expired tombstones.
      */
     public abstract RowIndexEntry write(long currentPosition, DataOutput out) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 9552895..65515d6 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -154,7 +154,7 @@ public class CompactionController
 
     /**
      * @return true if it's okay to drop tombstones for the given row, i.e., if we know all the verisons of the row
-     * are included in the compaction set
+     * older than @param maxDeletionTimestamp are included in the compaction set
      */
     public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 7d7c5a4..0cdbbb7 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -52,13 +52,13 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
     private final List<? extends OnDiskAtomIterator> rows;
     private final CompactionController controller;
     private final boolean shouldPurge;
-    private ColumnFamily emptyColumnFamily;
+    private final ColumnFamily emptyColumnFamily;
     private Reducer reducer;
     private ColumnStats columnStats;
     private boolean closed;
     private ColumnIndex.Builder indexBuilder;
     private final SecondaryIndexManager.Updater indexer;
-    private long maxDelTimestamp;
+    private long maxTombstoneTimestamp;
 
     public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
     {
@@ -67,18 +67,31 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         this.controller = controller;
         indexer = controller.cfs.indexManager.updaterFor(key);
 
-        maxDelTimestamp = Long.MIN_VALUE;
+        ColumnFamily rawCf = null;
+        maxTombstoneTimestamp = Long.MIN_VALUE;
         for (OnDiskAtomIterator row : rows)
         {
             ColumnFamily cf = row.getColumnFamily();
-            maxDelTimestamp = Math.max(maxDelTimestamp, cf.deletionInfo().maxTimestamp());
+            maxTombstoneTimestamp = Math.max(maxTombstoneTimestamp, cf.deletionInfo().maxTimestamp());
 
-            if (emptyColumnFamily == null)
-                emptyColumnFamily = cf;
+            if (rawCf == null)
+                rawCf = cf;
             else
-                emptyColumnFamily.delete(cf);
+                rawCf.delete(cf);
         }
-        this.shouldPurge = controller.shouldPurge(key, maxDelTimestamp);
+
+        // Don't pass maxTombstoneTimestamp to shouldPurge since we might well have cells with
+        // tombstones newer than the row-level tombstones we've seen -- but we won't know that
+        // until we iterate over them.  By passing MAX_VALUE we will only purge if there are
+        // no other versions of this row present.
+        this.shouldPurge = controller.shouldPurge(key, Long.MAX_VALUE);
+
+        // even if we can't delete all the tombstones allowed by gcBefore, we should still call removeDeleted
+        // to get rid of redundant row-level and range tombstones
+        assert rawCf != null;
+        int overriddenGcBefore = shouldPurge ? controller.gcBefore : Integer.MIN_VALUE;
+        ColumnFamily purgedCf = ColumnFamilyStore.removeDeleted(rawCf, overriddenGcBefore);
+        emptyColumnFamily = purgedCf == null ? ArrayBackedSortedColumns.factory.create(controller.cfs.metadata) : purgedCf;
     }
 
     public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
@@ -89,7 +102,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         try
         {
             indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
-            columnsIndex = indexBuilder.build(this);
+            columnsIndex = indexBuilder.buildForCompaction(iterator());
             if (columnsIndex.columnsIndex.isEmpty())
             {
                 boolean cfIrrelevant = shouldPurge
@@ -107,7 +120,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         // (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
         columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
                                       reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
-                                      reducer == null ? maxDelTimestamp : Math.max(maxDelTimestamp, reducer.maxTimestampSeen),
+                                      reducer == null ? maxTombstoneTimestamp : Math.max(maxTombstoneTimestamp, reducer.maxTimestampSeen),
                                       reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen,
                                       reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones,
                                       reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.minColumnNameSeen,
@@ -115,6 +128,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         );
         reducer = null;
 
+        indexBuilder.maybeWriteEmptyRowHeader();
         out.writeShort(SSTableWriter.END_OF_ROW);
 
         close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index d8a2745..708e929 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -54,14 +54,14 @@ public class Scrubber implements Closeable
 
     private final OutputHandler outputHandler;
 
-    private static final Comparator<AbstractCompactedRow> acrComparator = new Comparator<AbstractCompactedRow>()
+    private static final Comparator<Row> rowComparator = new Comparator<Row>()
     {
-         public int compare(AbstractCompactedRow r1, AbstractCompactedRow r2)
+         public int compare(Row r1, Row r2)
          {
              return r1.key.compareTo(r2.key);
          }
     };
-    private final Set<AbstractCompactedRow> outOfOrderRows = new TreeSet<AbstractCompactedRow>(acrComparator);
+    private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator);
 
     public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException
     {
@@ -100,7 +100,7 @@ public class Scrubber implements Closeable
 
     public void scrub()
     {
-        outputHandler.output("Scrubbing " + sstable);
+        outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
         try
         {
             ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
@@ -113,7 +113,7 @@ public class Scrubber implements Closeable
             // TODO errors when creating the writer may leave empty temp files.
             writer = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
 
-            AbstractCompactedRow prevRow = null;
+            DecoratedKey prevKey = null;
 
             while (!dataFile.isEOF())
             {
@@ -184,20 +184,19 @@ public class Scrubber implements Closeable
                     if (dataSize > dataFile.length())
                         throw new IOError(new IOException("Impossible row size " + dataSize));
 
-                    SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
-                    AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
-                    if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
+                    SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
+                    if (prevKey != null && prevKey.compareTo(key) > 0)
                     {
-                        outOfOrderRows.add(compactedRow);
-                        outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+                        saveOutOfOrderRow(prevKey, key, atoms);
                         continue;
                     }
 
+                    AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
                     if (writer.append(compactedRow) == null)
                         emptyRows++;
                     else
                         goodRows++;
-                    prevRow = compactedRow;
+                    prevKey = key;
                     if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
                         outputHandler.warn("Index file contained a different key or row size; using key from data file");
                 }
@@ -215,19 +214,19 @@ public class Scrubber implements Closeable
                         key = sstable.partitioner.decorateKey(currentIndexKey);
                         try
                         {
-                            SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSizeFromIndex, true);
-                            AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
-                            if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
+                            SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
+                            if (prevKey != null && prevKey.compareTo(key) > 0)
                             {
-                                outOfOrderRows.add(compactedRow);
-                                outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+                                saveOutOfOrderRow(prevKey, key, atoms);
                                 continue;
                             }
+
+                            AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
                             if (writer.append(compactedRow) == null)
                                 emptyRows++;
                             else
                                 goodRows++;
-                            prevRow = compactedRow;
+                            prevKey = key;
                         }
                         catch (Throwable th2)
                         {
@@ -273,8 +272,8 @@ public class Scrubber implements Closeable
         if (!outOfOrderRows.isEmpty())
         {
             SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
-            for (AbstractCompactedRow row : outOfOrderRows)
-                inOrderWriter.append(row);
+            for (Row row : outOfOrderRows)
+                inOrderWriter.append(row.key, row.cf);
             newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
             outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable));
         }
@@ -294,6 +293,21 @@ public class Scrubber implements Closeable
         }
     }
 
+    private void saveOutOfOrderRow(DecoratedKey prevKey, DecoratedKey key, SSTableIdentityIterator atoms)
+    {
+        // TODO bitch if the row is too large?  if it is there's not much we can do ...
+        outputHandler.warn(String.format("Out of order row detected (%s found after %s)", key, prevKey));
+        // adding atoms in sorted order is worst-case for TMBSC, but we shouldn't need to do this very often
+        // and there's no sense in failing on mis-sorted cells when a TreeMap could safe us
+        ColumnFamily cf = atoms.getColumnFamily().cloneMeShallow(TreeMapBackedSortedColumns.factory, false);
+        while (atoms.hasNext())
+        {
+            OnDiskAtom atom = atoms.next();
+            cf.addAtom(atom);
+        }
+        outOfOrderRows.add(new Row(key, cf));
+    }
+
     public SSTableReader getNewSSTable()
     {
         return newSstable;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index ac598bd..70c0b42 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -259,7 +259,7 @@ public class SSTableWriter extends SSTable
                 columnIndexer.add(atom); // This write the atom on disk too
             }
 
-            columnIndexer.finish();
+            columnIndexer.maybeWriteEmptyRowHeader();
             dataFile.stream.writeShort(END_OF_ROW);
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/test/data/serialization/2.0/db.RowMutation.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/db.RowMutation.bin b/test/data/serialization/2.0/db.RowMutation.bin
index d6abf2d..73d93e8 100644
Binary files a/test/data/serialization/2.0/db.RowMutation.bin and b/test/data/serialization/2.0/db.RowMutation.bin differ


[10/10] git commit: merge from 2.0

Posted by jb...@apache.org.
merge from 2.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d39ec221
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d39ec221
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d39ec221

Branch: refs/heads/trunk
Commit: d39ec221be61b59805eb27d5e767f1958678ac3f
Parents: f1d5b5d
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 30 10:18:23 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Oct 30 10:18:23 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d39ec221/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8cdbbe1..5a2e56b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@
 
 
 2.0.3
+ * Fixes for compacting larger-than-memory rows (CASSANDRA-6274)
  * Compact hottest sstables first and optionally omit coldest from
    compaction entirely (CASSANDRA-6109)
  * Fix modifying column_metadata from thrift (CASSANDRA-6182)


[05/10] git commit: Make MS.getBoundTerms() correct for batched statements

Posted by jb...@apache.org.
Make MS.getBoundTerms() correct for batched statements

patch by Aleksey Yeschenko and Sam Tunnicliffe; reviewed by
Sylvain Lebresne


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7187a8af
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7187a8af
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7187a8af

Branch: refs/heads/trunk
Commit: 7187a8af05b53d5318f3e4fa47122b232c4c7e52
Parents: 0a38dd5
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Oct 30 13:20:47 2013 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Oct 30 13:20:47 2013 +0300

----------------------------------------------------------------------
 .../apache/cassandra/cql3/VariableSpecifications.java    |  7 +++++++
 .../cassandra/cql3/statements/DeleteStatement.java       |  6 +++---
 .../cassandra/cql3/statements/ModificationStatement.java | 11 ++++++++---
 .../cassandra/cql3/statements/UpdateStatement.java       |  8 ++++----
 4 files changed, 22 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7187a8af/src/java/org/apache/cassandra/cql3/VariableSpecifications.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/VariableSpecifications.java b/src/java/org/apache/cassandra/cql3/VariableSpecifications.java
index ecdba6f..297999a 100644
--- a/src/java/org/apache/cassandra/cql3/VariableSpecifications.java
+++ b/src/java/org/apache/cassandra/cql3/VariableSpecifications.java
@@ -24,6 +24,7 @@ public class VariableSpecifications
 {
     private final List<ColumnIdentifier> variableNames;
     private final ColumnSpecification[] specs;
+    private int collectedCount;
 
     public VariableSpecifications(List<ColumnIdentifier> variableNames)
     {
@@ -48,5 +49,11 @@ public class VariableSpecifications
         if (name != null)
             spec = new ColumnSpecification(spec.ksName, spec.cfName, name, spec.type);
         specs[bindIndex] = spec;
+        collectedCount++;
+    }
+
+    public int getCollectedCount()
+    {
+        return collectedCount;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7187a8af/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index 3704e14..4852cf7 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -31,9 +31,9 @@ import org.apache.cassandra.utils.Pair;
  */
 public class DeleteStatement extends ModificationStatement
 {
-    private DeleteStatement(int boundTerms, CFMetaData cfm, Attributes attrs)
+    private DeleteStatement(CFMetaData cfm, Attributes attrs)
     {
-        super(boundTerms, cfm, attrs);
+        super(cfm, attrs);
     }
 
     public boolean requireFullClusteringKey()
@@ -105,7 +105,7 @@ public class DeleteStatement extends ModificationStatement
 
         protected ModificationStatement prepareInternal(CFDefinition cfDef, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException
         {
-            DeleteStatement stmt = new DeleteStatement(boundNames.size(), cfDef.cfm, attrs);
+            DeleteStatement stmt = new DeleteStatement(cfDef.cfm, attrs);
 
             for (Operation.RawDeletion deletion : deletions)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7187a8af/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 0f425b8..7aebc48 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -46,19 +46,18 @@ public abstract class ModificationStatement implements CQLStatement
 {
     private static final ColumnIdentifier CAS_RESULT_COLUMN = new ColumnIdentifier("[applied]", false);
 
-    private final int boundTerms;
     public final CFMetaData cfm;
     public final Attributes attrs;
 
     private final Map<ColumnIdentifier, Restriction> processedKeys = new HashMap<ColumnIdentifier, Restriction>();
     private final List<Operation> columnOperations = new ArrayList<Operation>();
 
+    private int boundTerms;
     private List<Operation> columnConditions;
     private boolean ifNotExists;
 
-    public ModificationStatement(int boundTerms, CFMetaData cfm, Attributes attrs)
+    public ModificationStatement(CFMetaData cfm, Attributes attrs)
     {
-        this.boundTerms = boundTerms;
         this.cfm = cfm;
         this.attrs = attrs;
     }
@@ -579,6 +578,10 @@ public abstract class ModificationStatement implements CQLStatement
             CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
             CFDefinition cfDef = metadata.getCfDef();
 
+            // The collected count in the beginning of preparation.
+            // Will start at non-zero for statements nested inside a BatchStatement (the second and the further ones).
+            int collected = boundNames.getCollectedCount();
+
             Attributes preparedAttributes = attrs.prepare(keyspace(), columnFamily());
             preparedAttributes.collectMarkerSpecification(boundNames);
 
@@ -634,6 +637,8 @@ public abstract class ModificationStatement implements CQLStatement
                     }
                 }
             }
+
+            stmt.boundTerms = boundNames.getCollectedCount() - collected;
             return stmt;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7187a8af/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index 12348df..a387962 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -35,9 +35,9 @@ public class UpdateStatement extends ModificationStatement
 {
     private static final Operation setToEmptyOperation = new Constants.Setter(null, new Constants.Value(ByteBufferUtil.EMPTY_BYTE_BUFFER));
 
-    private UpdateStatement(int boundTerms, CFMetaData cfm, Attributes attrs)
+    private UpdateStatement(CFMetaData cfm, Attributes attrs)
     {
-        super(boundTerms, cfm, attrs);
+        super(cfm, attrs);
     }
 
     public boolean requireFullClusteringKey()
@@ -131,7 +131,7 @@ public class UpdateStatement extends ModificationStatement
 
         protected ModificationStatement prepareInternal(CFDefinition cfDef, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException
         {
-            UpdateStatement stmt = new UpdateStatement(boundNames.size(), cfDef.cfm, attrs);
+            UpdateStatement stmt = new UpdateStatement(cfDef.cfm, attrs);
 
             // Created from an INSERT
             if (stmt.isCounter())
@@ -201,7 +201,7 @@ public class UpdateStatement extends ModificationStatement
 
         protected ModificationStatement prepareInternal(CFDefinition cfDef, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException
         {
-            UpdateStatement stmt = new UpdateStatement(boundNames.size(), cfDef.cfm, attrs);
+            UpdateStatement stmt = new UpdateStatement(cfDef.cfm, attrs);
 
             for (Pair<ColumnIdentifier, Operation.RawUpdate> entry : updates)
             {


[09/10] git commit: Merge branch 'cassandra-2.0' into trunk

Posted by jb...@apache.org.
Merge branch 'cassandra-2.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1d5b5d2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1d5b5d2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1d5b5d2

Branch: refs/heads/trunk
Commit: f1d5b5d2af559705583fe52cdbc1c95d8af79862
Parents: 36cdf34 d4b5b0d
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 30 10:18:15 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Oct 30 10:18:15 2013 -0500

----------------------------------------------------------------------

----------------------------------------------------------------------



[03/10] git commit: Compact hottest sstables first and optionally omit coldest patch by Tyler Hobbs; reviewed by jbellis for CASSANDRA-6109

Posted by jb...@apache.org.
Compact hottest sstables first and optionally omit coldest
patch by Tyler Hobbs; reviewed by jbellis for CASSANDRA-6109


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/37285304
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/37285304
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/37285304

Branch: refs/heads/trunk
Commit: 37285304ee484122410c977399024f2af132753c
Parents: 5eddf18
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Oct 29 15:25:39 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Oct 29 15:41:17 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../SizeTieredCompactionStrategy.java           | 149 +++++++++++---
 .../SizeTieredCompactionStrategyOptions.java    |  53 ++---
 .../cassandra/io/sstable/SSTableReader.java     |   4 +-
 .../SizeTieredCompactionStrategyTest.java       | 192 ++++++++++++++++++-
 5 files changed, 341 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/37285304/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7bf7f21..4815c1c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 2.0.3
+ * Compact hottest sstables first and optionally omit coldest from
+   compaction entirely (CASSANDRA-6109)
  * Fix modifying column_metadata from thrift (CASSANDRA-6182)
  * cqlsh: fix LIST USERS output (CASSANDRA-6242)
  * Add IRequestSink interface (CASSANDRA-6248)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37285304/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index cee5f97..5115860 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -20,8 +20,9 @@ package org.apache.cassandra.db.compaction;
 import java.util.*;
 import java.util.Map.Entry;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
-import com.google.common.primitives.Longs;
+import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,8 +55,10 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         int minThreshold = cfs.getMinimumCompactionThreshold();
         int maxThreshold = cfs.getMaximumCompactionThreshold();
 
-        Set<SSTableReader> candidates = cfs.getUncompactingSSTables();
-        List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(filterSuspectSSTables(candidates)), options.bucketHigh, options.bucketLow, options.minSSTableSize);
+        Iterable<SSTableReader> candidates = filterSuspectSSTables(cfs.getUncompactingSSTables());
+        candidates = filterColdSSTables(Lists.newArrayList(candidates), options.coldReadsToOmit);
+
+        List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(candidates), options.bucketHigh, options.bucketLow, options.minSSTableSize);
         logger.debug("Compaction buckets are {}", buckets);
         updateEstimatedCompactionsByTasks(buckets);
         List<SSTableReader> mostInteresting = mostInterestingBucket(buckets, minThreshold, maxThreshold);
@@ -77,34 +80,88 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         return Collections.singletonList(sstablesWithTombstones.get(0));
     }
 
+    /**
+     * Removes as many cold sstables as possible while retaining at least 1-coldReadsToOmit of the total reads/sec
+     * across all sstables
+     * @param sstables all sstables to consider
+     * @param coldReadsToOmit the proportion of total reads/sec that will be omitted (0=omit nothing, 1=omit everything)
+     * @return a list of sstables with the coldest sstables excluded until the reads they represent reaches coldReadsToOmit
+     */
+    @VisibleForTesting
+    static List<SSTableReader> filterColdSSTables(List<SSTableReader> sstables, double coldReadsToOmit)
+    {
+        // sort the sstables by hotness (coldest-first), breaking ties with size on disk (mainly for system tables and cold tables)
+        Collections.sort(sstables, new Comparator<SSTableReader>()
+        {
+            public int compare(SSTableReader o1, SSTableReader o2)
+            {
+                int comparison = Double.compare(hotness(o1), hotness(o2));
+                if (comparison != 0)
+                    return comparison;
+
+                return Long.compare(o1.bytesOnDisk(), o2.bytesOnDisk());
+            }
+        });
+
+        // calculate the total reads/sec across all sstables
+        double totalReads = 0.0;
+        for (SSTableReader sstr : sstables)
+            if (sstr.readMeter != null)
+                totalReads += sstr.readMeter.twoHourRate();
+
+        // if this is a system table with no read meters or we don't have any read rates yet, just return them all
+        if (totalReads == 0.0)
+            return sstables;
+
+        // iteratively ignore the coldest sstables until ignoring one more would put us over the coldReadsToOmit threshold
+        double maxColdReads = coldReadsToOmit * totalReads;
+
+        double totalColdReads = 0.0;
+        int cutoffIndex = 0;
+        while (cutoffIndex < sstables.size())
+        {
+            double reads = sstables.get(cutoffIndex).readMeter.twoHourRate();
+            if (totalColdReads + reads > maxColdReads)
+                break;
+
+            totalColdReads += reads;
+            cutoffIndex++;
+        }
+
+        return sstables.subList(cutoffIndex, sstables.size());
+    }
+
+    /**
+     * @param buckets list of buckets from which to return the most interesting, where "interesting" is the total hotness for reads
+     * @param minThreshold minimum number of sstables in a bucket to qualify as interesting
+     * @param maxThreshold maximum number of sstables to compact at once (the returned bucket will be trimmed down to this)
+     * @return a bucket (list) of sstables to compact
+     */
     public static List<SSTableReader> mostInterestingBucket(List<List<SSTableReader>> buckets, int minThreshold, int maxThreshold)
     {
-        // skip buckets containing less than minThreshold sstables, and limit other buckets to maxThreshold entries
-        List<List<SSTableReader>> prunedBuckets = new ArrayList<List<SSTableReader>>();
+        // skip buckets containing less than minThreshold sstables, and limit other buckets to maxThreshold sstables
+        final List<Pair<List<SSTableReader>, Double>> prunedBucketsAndHotness = new ArrayList<>(buckets.size());
         for (List<SSTableReader> bucket : buckets)
         {
-            if (bucket.size() < minThreshold)
-                continue;
-
-            Collections.sort(bucket, new Comparator<SSTableReader>()
-            {
-                public int compare(SSTableReader o1, SSTableReader o2)
-                {
-                    return o1.descriptor.generation - o2.descriptor.generation;
-                }
-            });
-            List<SSTableReader> prunedBucket = bucket.subList(0, Math.min(bucket.size(), maxThreshold));
-            prunedBuckets.add(prunedBucket);
+            Pair<List<SSTableReader>, Double> bucketAndHotness = trimToThresholdWithHotness(bucket, maxThreshold);
+            if (bucketAndHotness != null && bucketAndHotness.left.size() >= minThreshold)
+                prunedBucketsAndHotness.add(bucketAndHotness);
         }
-        if (prunedBuckets.isEmpty())
+        if (prunedBucketsAndHotness.isEmpty())
             return Collections.emptyList();
 
-        // prefer compacting buckets with smallest average size; that will yield the fastest improvement for read performance
-        return Collections.min(prunedBuckets, new Comparator<List<SSTableReader>>()
+        // prefer compacting the hottest bucket
+        Pair<List<SSTableReader>, Double> hottest = Collections.max(prunedBucketsAndHotness, new Comparator<Pair<List<SSTableReader>, Double>>()
         {
-            public int compare(List<SSTableReader> o1, List<SSTableReader> o2)
+            public int compare(Pair<List<SSTableReader>, Double> o1, Pair<List<SSTableReader>, Double> o2)
             {
-                return Longs.compare(avgSize(o1), avgSize(o2));
+                int comparison = Double.compare(o1.right, o2.right);
+                if (comparison != 0)
+                    return comparison;
+
+                // break ties by compacting the smallest sstables first (this will probably only happen for
+                // system tables and new/unread sstables)
+                return Long.compare(avgSize(o1.left), avgSize(o2.left));
             }
 
             private long avgSize(List<SSTableReader> sstables)
@@ -115,6 +172,44 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
                 return n / sstables.size();
             }
         });
+
+        return hottest.left;
+    }
+
+    /**
+     * Returns a (bucket, hotness) pair or null if there were not enough sstables in the bucket to meet minThreshold.
+     * If there are more than maxThreshold sstables, the coldest sstables will be trimmed to meet the threshold.
+     **/
+    @VisibleForTesting
+    static Pair<List<SSTableReader>, Double> trimToThresholdWithHotness(List<SSTableReader> bucket, int maxThreshold)
+    {
+        // sort by sstable hotness (descending)
+        Collections.sort(bucket, new Comparator<SSTableReader>()
+        {
+            public int compare(SSTableReader o1, SSTableReader o2)
+            {
+                return -1 * Double.compare(hotness(o1), hotness(o2));
+            }
+        });
+
+        // and then trim the coldest sstables off the end to meet the maxThreshold
+        List<SSTableReader> prunedBucket = bucket.subList(0, Math.min(bucket.size(), maxThreshold));
+
+        // bucket hotness is the sum of the hotness of all sstable members
+        double bucketHotness = 0.0;
+        for (SSTableReader sstr : prunedBucket)
+            bucketHotness += hotness(sstr);
+
+        return Pair.create(prunedBucket, bucketHotness);
+    }
+
+    /**
+     * Returns the reads per second per key for this sstable, or 0.0 if the sstable has no read meter
+     */
+    private static double hotness(SSTableReader sstr)
+    {
+        // system tables don't have read meters, just use 0.0 for the hotness
+        return sstr.readMeter == null ? 0.0 : sstr.readMeter.twoHourRate() / sstr.estimatedKeys();
     }
 
     public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
@@ -124,13 +219,13 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
 
         while (true)
         {
-            List<SSTableReader> smallestBucket = getNextBackgroundSSTables(gcBefore);
+            List<SSTableReader> hottestBucket = getNextBackgroundSSTables(gcBefore);
 
-            if (smallestBucket.isEmpty())
+            if (hottestBucket.isEmpty())
                 return null;
 
-            if (cfs.getDataTracker().markCompacting(smallestBucket))
-                return new CompactionTask(cfs, smallestBucket, gcBefore);
+            if (cfs.getDataTracker().markCompacting(hottestBucket))
+                return new CompactionTask(cfs, hottestBucket, gcBefore);
         }
     }
 
@@ -253,4 +348,4 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
             cfs.getMinimumCompactionThreshold(),
             cfs.getMaximumCompactionThreshold());
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37285304/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
index d7c9075..711ec6e 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
@@ -26,31 +26,48 @@ public final class SizeTieredCompactionStrategyOptions
     protected static final long DEFAULT_MIN_SSTABLE_SIZE = 50L * 1024L * 1024L;
     protected static final double DEFAULT_BUCKET_LOW = 0.5;
     protected static final double DEFAULT_BUCKET_HIGH = 1.5;
+    protected static final double DEFAULT_COLD_READS_TO_OMIT = 0.0;
     protected static final String MIN_SSTABLE_SIZE_KEY = "min_sstable_size";
     protected static final String BUCKET_LOW_KEY = "bucket_low";
     protected static final String BUCKET_HIGH_KEY = "bucket_high";
+    protected static final String MAX_COLD_READS_RATIO_KEY = "max_cold_reads_ratio";
 
     protected long minSSTableSize;
     protected double bucketLow;
     protected double bucketHigh;
+    protected double coldReadsToOmit;
 
     public SizeTieredCompactionStrategyOptions(Map<String, String> options)
     {
-
         String optionValue = options.get(MIN_SSTABLE_SIZE_KEY);
         minSSTableSize = optionValue == null ? DEFAULT_MIN_SSTABLE_SIZE : Long.parseLong(optionValue);
         optionValue = options.get(BUCKET_LOW_KEY);
         bucketLow = optionValue == null ? DEFAULT_BUCKET_LOW : Double.parseDouble(optionValue);
         optionValue = options.get(BUCKET_HIGH_KEY);
         bucketHigh = optionValue == null ? DEFAULT_BUCKET_HIGH : Double.parseDouble(optionValue);
+        optionValue = options.get(MAX_COLD_READS_RATIO_KEY);
+        coldReadsToOmit = optionValue == null ? DEFAULT_COLD_READS_TO_OMIT : Double.parseDouble(optionValue);
     }
 
     public SizeTieredCompactionStrategyOptions()
     {
-
         minSSTableSize = DEFAULT_MIN_SSTABLE_SIZE;
         bucketLow = DEFAULT_BUCKET_LOW;
         bucketHigh = DEFAULT_BUCKET_HIGH;
+        coldReadsToOmit = DEFAULT_COLD_READS_TO_OMIT;
+    }
+
+    private static double parseDouble(Map<String, String> options, String key, double defaultValue) throws ConfigurationException
+    {
+        String optionValue = options.get(key);
+        try
+        {
+            return optionValue == null ? defaultValue : Double.parseDouble(optionValue);
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException(String.format("%s is not a parsable float for %s", optionValue, key), e);
+        }
     }
 
     public static Map<String, String> validateOptions(Map<String, String> options, Map<String, String> uncheckedOptions) throws ConfigurationException
@@ -69,36 +86,26 @@ public final class SizeTieredCompactionStrategyOptions
             throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, MIN_SSTABLE_SIZE_KEY), e);
         }
 
-        double bucketLow, bucketHigh;
-        optionValue = options.get(BUCKET_LOW_KEY);
-        try
-        {
-            bucketLow = optionValue == null ? DEFAULT_BUCKET_LOW : Double.parseDouble(optionValue);
-        }
-        catch (NumberFormatException e)
-        {
-            throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, DEFAULT_BUCKET_LOW), e);
-        }
-
-        optionValue = options.get(BUCKET_HIGH_KEY);
-        try
-        {
-            bucketHigh = optionValue == null ? DEFAULT_BUCKET_HIGH : Double.parseDouble(optionValue);
-        }
-        catch (NumberFormatException e)
+        double bucketLow = parseDouble(options, BUCKET_LOW_KEY, DEFAULT_BUCKET_LOW);
+        double bucketHigh = parseDouble(options, BUCKET_HIGH_KEY, DEFAULT_BUCKET_HIGH);
+        if (bucketHigh <= bucketLow)
         {
-            throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, DEFAULT_BUCKET_HIGH), e);
+            throw new ConfigurationException(String.format("%s value (%s) is less than or equal to the %s value (%s)",
+                                                           BUCKET_HIGH_KEY, bucketHigh, BUCKET_LOW_KEY, bucketLow));
         }
 
-        if (bucketHigh <= bucketLow)
+        double maxColdReadsRatio = parseDouble(options, MAX_COLD_READS_RATIO_KEY, DEFAULT_COLD_READS_TO_OMIT);
+        if (maxColdReadsRatio < 0.0 || maxColdReadsRatio > 1.0)
         {
-            throw new ConfigurationException(String.format("Bucket high value (%s) is less than or equal bucket low value (%s)", bucketHigh, bucketLow));
+            throw new ConfigurationException(String.format("%s value (%s) should be between between 0.0 and 1.0",
+                                                           MAX_COLD_READS_RATIO_KEY, optionValue));
         }
 
         uncheckedOptions.remove(MIN_SSTABLE_SIZE_KEY);
         uncheckedOptions.remove(BUCKET_LOW_KEY);
         uncheckedOptions.remove(BUCKET_HIGH_KEY);
+        uncheckedOptions.remove(MAX_COLD_READS_RATIO_KEY);
 
         return uncheckedOptions;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37285304/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 9837f4c..c961d44 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Longs;
 import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
@@ -103,7 +104,8 @@ public class SSTableReader extends SSTable implements Closeable
     private final AtomicLong keyCacheHit = new AtomicLong(0);
     private final AtomicLong keyCacheRequest = new AtomicLong(0);
 
-    public final RestorableMeter readMeter;
+    @VisibleForTesting
+    public RestorableMeter readMeter;
 
     public static long getApproximateKeyCount(Iterable<SSTableReader> sstables, CFMetaData metadata)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37285304/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
index 89604c5..5e79bd8 100644
--- a/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
@@ -17,17 +17,80 @@
  */
 package org.apache.cassandra.db.compaction;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.nio.ByteBuffer;
+import java.util.*;
 
 import org.junit.Test;
 
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.metrics.RestorableMeter;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
-import static org.junit.Assert.assertEquals;
+import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.getBuckets;
+import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.mostInterestingBucket;
+import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.trimToThresholdWithHotness;
+import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.filterColdSSTables;
+import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.validateOptions;
 
-public class SizeTieredCompactionStrategyTest
+import static org.junit.Assert.*;
+
+public class SizeTieredCompactionStrategyTest extends SchemaLoader
 {
+
+    @Test
+    public void testOptionsValidation() throws ConfigurationException
+    {
+        Map<String, String> options = new HashMap<>();
+        options.put(SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY, "0.35");
+        options.put(SizeTieredCompactionStrategyOptions.BUCKET_LOW_KEY, "0.5");
+        options.put(SizeTieredCompactionStrategyOptions.BUCKET_HIGH_KEY, "1.5");
+        options.put(SizeTieredCompactionStrategyOptions.MIN_SSTABLE_SIZE_KEY, "10000");
+        Map<String, String> unvalidated = validateOptions(options);
+        assertTrue(unvalidated.isEmpty());
+
+        try
+        {
+            options.put(SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY, "-0.5");
+            validateOptions(options);
+            fail(String.format("Negative %s should be rejected", SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY));
+        }
+        catch (ConfigurationException e) {}
+
+        try
+        {
+            options.put(SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY, "10.0");
+            validateOptions(options);
+            fail(String.format("%s > 1.0 should be rejected", SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY));
+        }
+        catch (ConfigurationException e)
+        {
+            options.put(SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY, "0.25");
+        }
+
+        try
+        {
+            options.put(SizeTieredCompactionStrategyOptions.BUCKET_LOW_KEY, "1000.0");
+            validateOptions(options);
+            fail("bucket_low greater than bucket_high should be rejected");
+        }
+        catch (ConfigurationException e)
+        {
+            options.put(SizeTieredCompactionStrategyOptions.BUCKET_LOW_KEY, "0.5");
+        }
+
+        options.put("bad_option", "1.0");
+        unvalidated = validateOptions(options);
+        assertTrue(unvalidated.containsKey("bad_option"));
+    }
+
     @Test
     public void testGetBuckets()
     {
@@ -39,7 +102,7 @@ public class SizeTieredCompactionStrategyTest
             pairs.add(pair);
         }
 
-        List<List<String>> buckets = SizeTieredCompactionStrategy.getBuckets(pairs, 1.5, 0.5, 2);
+        List<List<String>> buckets = getBuckets(pairs, 1.5, 0.5, 2);
         assertEquals(3, buckets.size());
 
         for (List<String> bucket : buckets)
@@ -59,7 +122,7 @@ public class SizeTieredCompactionStrategyTest
             pairs.add(pair);
         }
 
-        buckets = SizeTieredCompactionStrategy.getBuckets(pairs, 1.5, 0.5, 2);
+        buckets = getBuckets(pairs, 1.5, 0.5, 2);
         assertEquals(2, buckets.size());
 
         for (List<String> bucket : buckets)
@@ -80,7 +143,120 @@ public class SizeTieredCompactionStrategyTest
             pairs.add(pair);
         }
 
-        buckets = SizeTieredCompactionStrategy.getBuckets(pairs, 1.5, 0.5, 10);
+        buckets = getBuckets(pairs, 1.5, 0.5, 10);
         assertEquals(1, buckets.size());
     }
-}
+
+    @Test
+    public void testPrepBucket() throws Exception
+    {
+        String ksname = "Keyspace1";
+        String cfname = "Standard1";
+        Keyspace keyspace = Keyspace.open(ksname);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+        // create 3 sstables
+        int numSSTables = 3;
+        for (int r = 0; r < numSSTables; r++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(r));
+            RowMutation rm = new RowMutation(ksname, key.key);
+            rm.add(cfname, ByteBufferUtil.bytes("column"), value, 0);
+            rm.apply();
+            cfs.forceBlockingFlush();
+        }
+        cfs.forceBlockingFlush();
+
+        List<SSTableReader> sstrs = new ArrayList<>(cfs.getSSTables());
+        Pair<List<SSTableReader>, Double> bucket;
+
+        List<SSTableReader> interestingBucket = mostInterestingBucket(Collections.singletonList(sstrs.subList(0, 2)), 4, 32);
+        assertTrue("nothing should be returned when all buckets are below the min threshold", interestingBucket.isEmpty());
+
+        sstrs.get(0).readMeter = new RestorableMeter(100.0, 100.0);
+        sstrs.get(1).readMeter = new RestorableMeter(200.0, 200.0);
+        sstrs.get(2).readMeter = new RestorableMeter(300.0, 300.0);
+
+        long estimatedKeys = sstrs.get(0).estimatedKeys();
+
+        // if we have more than the max threshold, the coldest should be dropped
+        bucket = trimToThresholdWithHotness(sstrs, 2);
+        assertEquals("one bucket should have been dropped", 2, bucket.left.size());
+        double expectedBucketHotness = (200.0 + 300.0) / estimatedKeys;
+        assertEquals(String.format("bucket hotness (%f) should be close to %f", bucket.right, expectedBucketHotness),
+                     expectedBucketHotness, bucket.right, 1.0);
+    }
+
+    @Test
+    public void testFilterColdSSTables() throws Exception
+    {
+        String ksname = "Keyspace1";
+        String cfname = "Standard1";
+        Keyspace keyspace = Keyspace.open(ksname);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+        // create 10 sstables
+        int numSSTables = 10;
+        for (int r = 0; r < numSSTables; r++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(r));
+            RowMutation rm = new RowMutation(ksname, key.key);
+            rm.add(cfname, ByteBufferUtil.bytes("column"), value, 0);
+            rm.apply();
+            cfs.forceBlockingFlush();
+        }
+        cfs.forceBlockingFlush();
+
+        List<SSTableReader> filtered;
+        List<SSTableReader> sstrs = new ArrayList<>(cfs.getSSTables());
+
+        for (SSTableReader sstr : sstrs)
+            sstr.readMeter = null;
+        filtered = filterColdSSTables(sstrs, 0.05);
+        assertEquals("when there are no read meters, no sstables should be filtered", sstrs.size(), filtered.size());
+
+        for (SSTableReader sstr : sstrs)
+            sstr.readMeter = new RestorableMeter(0.0, 0.0);
+        filtered = filterColdSSTables(sstrs, 0.05);
+        assertEquals("when all read meters are zero, no sstables should be filtered", sstrs.size(), filtered.size());
+
+        // leave all read rates at 0 besides one
+        sstrs.get(0).readMeter = new RestorableMeter(1000.0, 1000.0);
+        filtered = filterColdSSTables(sstrs, 0.05);
+        assertEquals("there should only be one hot sstable", 1, filtered.size());
+        assertEquals(1000.0, filtered.get(0).readMeter.twoHourRate(), 0.5);
+
+        // the total read rate is 100, and we'll set a threshold of 2.5%, so two of the sstables with read
+        // rate 1.0 should be ignored, but not the third
+        for (SSTableReader sstr : sstrs)
+            sstr.readMeter = new RestorableMeter(0.0, 0.0);
+        sstrs.get(0).readMeter = new RestorableMeter(97.0, 97.0);
+        sstrs.get(1).readMeter = new RestorableMeter(1.0, 1.0);
+        sstrs.get(2).readMeter = new RestorableMeter(1.0, 1.0);
+        sstrs.get(3).readMeter = new RestorableMeter(1.0, 1.0);
+
+        filtered = filterColdSSTables(sstrs, 0.025);
+        assertEquals(2, filtered.size());
+        assertEquals(98.0, filtered.get(0).readMeter.twoHourRate() + filtered.get(1).readMeter.twoHourRate(), 0.5);
+
+        // make sure a threshold of 0.0 doesn't result in any sstables being filtered
+        for (SSTableReader sstr : sstrs)
+            sstr.readMeter = new RestorableMeter(1.0, 1.0);
+        filtered = filterColdSSTables(sstrs, 0.0);
+        assertEquals(sstrs.size(), filtered.size());
+
+        // just for fun, set a threshold where all sstables are considered cold
+        for (SSTableReader sstr : sstrs)
+            sstr.readMeter = new RestorableMeter(1.0, 1.0);
+        filtered = filterColdSSTables(sstrs, 1.0);
+        assertTrue(filtered.isEmpty());
+    }
+}
\ No newline at end of file


[08/10] git commit: Remove multithreaded compaction and PCR patch by jbellis; reviewed by marcuse for CASSANDRA-6142

Posted by jb...@apache.org.
Remove multithreaded compaction and PCR
patch by jbellis; reviewed by marcuse for CASSANDRA-6142


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/36cdf34b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/36cdf34b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/36cdf34b

Branch: refs/heads/trunk
Commit: 36cdf34bd92ede5ad99447e10d90e6caa1fd743a
Parents: ba3c1bc
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 30 10:17:23 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Oct 30 10:17:23 2013 -0500

----------------------------------------------------------------------
 conf/cassandra.yaml                             |   7 -
 .../org/apache/cassandra/config/Config.java     |   1 -
 .../cassandra/config/DatabaseDescriptor.java    |   7 -
 .../org/apache/cassandra/db/ColumnIndex.java    |  19 +-
 .../db/compaction/AbstractCompactedRow.java     |   4 +-
 .../db/compaction/CompactionController.java     |  58 +--
 .../db/compaction/CompactionIterable.java       |   9 +-
 .../db/compaction/CompactionManager.java        |   6 +-
 .../cassandra/db/compaction/CompactionTask.java |   4 +-
 .../db/compaction/LazilyCompactedRow.java       |  91 +++--
 .../compaction/ParallelCompactionIterable.java  | 403 -------------------
 .../db/compaction/PrecompactedRow.java          | 202 ----------
 .../db/compaction/SSTableSplitter.java          |   4 +-
 .../cassandra/db/compaction/Scrubber.java       |  56 ++-
 .../cassandra/db/compaction/Upgrader.java       |   4 +-
 .../io/sstable/IndexSummaryBuilder.java         |   2 +-
 .../cassandra/io/sstable/SSTableWriter.java     |   2 +-
 test/data/serialization/2.0/db.RowMutation.bin  | Bin 3599 -> 3599 bytes
 .../db/compaction/CompactionsPurgeTest.java     |  32 +-
 .../cassandra/io/LazilyCompactedRowTest.java    | 319 ---------------
 .../apache/cassandra/repair/ValidatorTest.java  |  39 +-
 21 files changed, 173 insertions(+), 1096 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 455421a..d6b5b7a 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -443,13 +443,6 @@ in_memory_compaction_limit_in_mb: 64
 # Uncomment to make compaction mono-threaded, the pre-0.8 default.
 #concurrent_compactors: 1
 
-# Multi-threaded compaction. When enabled, each compaction will use
-# up to one thread per core, plus one thread per sstable being merged.
-# This is usually only useful for SSD-based hardware: otherwise, 
-# your concern is usually to get compaction to do LESS i/o (see:
-# compaction_throughput_mb_per_sec), not more.
-multithreaded_compaction: false
-
 # Throttles compaction to the given total throughput across the entire
 # system. The faster you insert data, the faster you need to compact in
 # order to keep the sstable count down, but in general, setting this to

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 8f0f22e..e9f48e6 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -109,7 +109,6 @@ public class Config
     public Integer in_memory_compaction_limit_in_mb = 64;
     public Integer concurrent_compactors = FBUtilities.getAvailableProcessors();
     public volatile Integer compaction_throughput_mb_per_sec = 16;
-    public Boolean multithreaded_compaction = false;
 
     public Integer max_streaming_retries = 3;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 90f1753..1997347 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -382,8 +382,6 @@ public class DatabaseDescriptor
             logger.debug("setting auto_bootstrap to {}", conf.auto_bootstrap);
         }
 
-        logger.info("{}using multi-threaded compaction", (conf.multithreaded_compaction ? "" : "Not "));
-
         if (conf.in_memory_compaction_limit_in_mb != null && conf.in_memory_compaction_limit_in_mb <= 0)
         {
             throw new ConfigurationException("in_memory_compaction_limit_in_mb must be a positive integer");
@@ -856,11 +854,6 @@ public class DatabaseDescriptor
         return conf.concurrent_compactors;
     }
 
-    public static boolean isMultithreadedCompaction()
-    {
-        return conf.multithreaded_compaction;
-    }
-
     public static int getCompactionThroughputMbPerSec()
     {
         return conf.compaction_throughput_mb_per_sec;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index a5e0447..6dd2028 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -140,19 +140,26 @@ public class ColumnIndex
             }
             ColumnIndex index = build();
 
-            finish();
+            maybeWriteEmptyRowHeader();
 
             return index;
         }
 
-        public ColumnIndex build(Iterable<OnDiskAtom> columns) throws IOException
+        /**
+         * The important distinction wrt build() is that we may be building for a row that ends up
+         * being compacted away entirely, i.e., the input consists only of expired tombstones (or
+         * columns shadowed by expired tombstone).  Thus, it is the caller's responsibility
+         * to decide whether to write the header for an empty row.
+         */
+        public ColumnIndex buildForCompaction(Iterator<OnDiskAtom> columns) throws IOException
         {
-            for (OnDiskAtom c : columns)
+            while (columns.hasNext())
+            {
+                OnDiskAtom c =  columns.next();
                 add(c);
+            }
             ColumnIndex index = build();
 
-            finish();
-
             return index;
         }
 
@@ -219,7 +226,7 @@ public class ColumnIndex
             return result;
         }
 
-        public void finish() throws IOException
+        public void maybeWriteEmptyRowHeader() throws IOException
         {
             if (!deletionInfo.isLive())
                 maybeWriteRowHeader();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
index 966770f..734155e 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
@@ -42,10 +42,10 @@ public abstract class AbstractCompactedRow implements Closeable
 
     /**
      * write the row (size + column index + filter + column data, but NOT row key) to @param out.
-     * It is an error to call this if isEmpty is false.  (Because the key is appended first,
-     * so we'd have an incomplete row written.)
      *
      * write() may change internal state; it is NOT valid to call write() or update() a second time.
+     *
+     * @return index information for the written row, or null if the compaction resulted in only expired tombstones.
      */
     public abstract RowIndexEntry write(long currentPosition, DataOutput out) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 9552895..d7c2e4d 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -27,7 +27,6 @@ import java.util.Set;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DataTracker;
 import org.apache.cassandra.db.DecoratedKey;
@@ -51,9 +50,6 @@ public class CompactionController
     public final int gcBefore;
     public final int mergeShardBefore;
 
-    /**
-     * Constructor that subclasses may use when overriding shouldPurge to not need overlappingTree
-     */
     protected CompactionController(ColumnFamilyStore cfs, int maxValue)
     {
         this(cfs, null, maxValue);
@@ -153,25 +149,24 @@ public class CompactionController
     }
 
     /**
-     * @return true if it's okay to drop tombstones for the given row, i.e., if we know all the verisons of the row
-     * are included in the compaction set
+     * @return the largest timestamp before which it's okay to drop tombstones for the given partition;
+     * i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be supressed
+     * in other sstables.
      */
-    public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp)
+    public long maxPurgeableTimestamp(DecoratedKey key)
     {
         List<SSTableReader> filteredSSTables = overlappingTree.search(key);
+        long min = Long.MAX_VALUE;
         for (SSTableReader sstable : filteredSSTables)
         {
-            if (sstable.getMinTimestamp() <= maxDeletionTimestamp)
-            {
-                // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing),
-                // we check index file instead.
-                if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
-                    return false;
-                else if (sstable.getBloomFilter().isPresent(key.key))
-                    return false;
-            }
+            // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing),
+            // we check index file instead.
+            if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null)
+                min = Math.min(min, sstable.getMinTimestamp());
+            else if (sstable.getBloomFilter().isPresent(key.key))
+                min = Math.min(min, sstable.getMinTimestamp());
         }
-        return true;
+        return min;
     }
 
     public void invalidateCachedRow(DecoratedKey key)
@@ -179,35 +174,6 @@ public class CompactionController
         cfs.invalidateCachedRow(key);
     }
 
-    /**
-     * @return an AbstractCompactedRow implementation to write the merged rows in question.
-     *
-     * If there is a single source row, the data is from a current-version sstable, we don't
-     * need to purge and we aren't forcing deserialization for scrub, write it unchanged.
-     * Otherwise, we deserialize, purge tombstones, and reserialize in the latest version.
-     */
-    public AbstractCompactedRow getCompactedRow(List<SSTableIdentityIterator> rows)
-    {
-        long rowSize = 0;
-        for (SSTableIdentityIterator row : rows)
-            rowSize += row.dataSize;
-
-        if (rowSize > DatabaseDescriptor.getInMemoryCompactionLimit())
-        {
-            String keyString = cfs.metadata.getKeyValidator().getString(rows.get(0).getKey().key);
-            logger.info(String.format("Compacting large row %s/%s:%s (%d bytes) incrementally",
-                                      cfs.keyspace.getName(), cfs.name, keyString, rowSize));
-            return new LazilyCompactedRow(this, rows);
-        }
-        return new PrecompactedRow(this, rows);
-    }
-
-    /** convenience method for single-sstable compactions */
-    public AbstractCompactedRow getCompactedRow(SSTableIdentityIterator row)
-    {
-        return getCompactedRow(Collections.singletonList(row));
-    }
-
     public void close()
     {
         SSTableReader.releaseReferences(overlappingSSTables);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
index 866907b..132cf25 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
@@ -21,6 +21,8 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 
+import com.google.common.collect.ImmutableList;
+
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.utils.CloseableIterator;
@@ -28,7 +30,6 @@ import org.apache.cassandra.utils.MergeIterator;
 
 public class CompactionIterable extends AbstractCompactionIterable
 {
-
     private static final Comparator<OnDiskAtomIterator> comparator = new Comparator<OnDiskAtomIterator>()
     {
         public int compare(OnDiskAtomIterator i1, OnDiskAtomIterator i2)
@@ -54,11 +55,11 @@ public class CompactionIterable extends AbstractCompactionIterable
 
     protected class Reducer extends MergeIterator.Reducer<OnDiskAtomIterator, AbstractCompactedRow>
     {
-        protected final List<SSTableIdentityIterator> rows = new ArrayList<SSTableIdentityIterator>();
+        protected final List<OnDiskAtomIterator> rows = new ArrayList<>();
 
         public void reduce(OnDiskAtomIterator current)
         {
-            rows.add((SSTableIdentityIterator) current);
+            rows.add(current);
         }
 
         protected AbstractCompactedRow getReduced()
@@ -71,7 +72,7 @@ public class CompactionIterable extends AbstractCompactionIterable
                 // create a new container for rows, since we're going to clear ours for the next one,
                 // and the AbstractCompactionRow code should be able to assume that the collection it receives
                 // won't be pulled out from under it.
-                return controller.getCompactedRow(new ArrayList<SSTableIdentityIterator>(rows));
+                return new LazilyCompactedRow(controller, ImmutableList.copyOf(rows));
             }
             finally
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index b14c313..966e471 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -579,7 +579,7 @@ public class CompactionManager implements CompactionManagerMBean
                     row = cleanupStrategy.cleanup(row);
                     if (row == null)
                         continue;
-                    AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
+                    AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(row));
                     if (writer.append(compactedRow) != null)
                         totalkeysWritten++;
                 }
@@ -905,7 +905,7 @@ public class CompactionManager implements CompactionManagerMBean
         }
 
         @Override
-        public boolean shouldPurge(DecoratedKey key, long delTimestamp)
+        public long maxPurgeableTimestamp(DecoratedKey key)
         {
             /*
              * The main reason we always purge is that including gcable tombstone would mean that the
@@ -918,7 +918,7 @@ public class CompactionManager implements CompactionManagerMBean
              * a tombstone that could shadow a column in another sstable, but this is doubly not a concern
              * since validation compaction is read-only.
              */
-            return true;
+            return Long.MAX_VALUE;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 1ea18e9..bb07be8 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -123,9 +123,7 @@ public class CompactionTask extends AbstractCompactionTask
         if (logger.isDebugEnabled())
             logger.debug("Expected bloom filter size : {}", keysPerSSTable);
 
-        AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
-                                      ? new ParallelCompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller)
-                                      : new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller);
+        AbstractCompactionIterable ci = new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller);
         CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
         Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 7d7c5a4..2cb014a 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -31,7 +31,6 @@ import com.google.common.collect.Iterators;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.ColumnNameHelper;
 import org.apache.cassandra.io.sstable.ColumnStats;
 import org.apache.cassandra.io.sstable.SSTable;
@@ -47,18 +46,18 @@ import org.apache.cassandra.utils.StreamingHistogram;
  * in memory at a time is the bloom filter, the index, and one column from each
  * pre-compaction row.
  */
-public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable<OnDiskAtom>
+public class LazilyCompactedRow extends AbstractCompactedRow
 {
     private final List<? extends OnDiskAtomIterator> rows;
     private final CompactionController controller;
-    private final boolean shouldPurge;
-    private ColumnFamily emptyColumnFamily;
-    private Reducer reducer;
+    private final long maxPurgeableTimestamp;
+    private final ColumnFamily emptyColumnFamily;
     private ColumnStats columnStats;
     private boolean closed;
     private ColumnIndex.Builder indexBuilder;
     private final SecondaryIndexManager.Updater indexer;
-    private long maxDelTimestamp;
+    private final Reducer reducer;
+    private final Iterator<OnDiskAtom> merger;
 
     public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
     {
@@ -67,18 +66,39 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         this.controller = controller;
         indexer = controller.cfs.indexManager.updaterFor(key);
 
-        maxDelTimestamp = Long.MIN_VALUE;
+        ColumnFamily rawCf = null;
         for (OnDiskAtomIterator row : rows)
         {
             ColumnFamily cf = row.getColumnFamily();
-            maxDelTimestamp = Math.max(maxDelTimestamp, cf.deletionInfo().maxTimestamp());
 
-            if (emptyColumnFamily == null)
-                emptyColumnFamily = cf;
+            if (rawCf == null)
+                rawCf = cf;
             else
-                emptyColumnFamily.delete(cf);
+                rawCf.delete(cf);
         }
-        this.shouldPurge = controller.shouldPurge(key, maxDelTimestamp);
+        maxPurgeableTimestamp = controller.maxPurgeableTimestamp(key);
+        // even if we can't delete all the tombstones allowed by gcBefore, we should still call removeDeleted
+        // to get rid of redundant row-level and range tombstones
+        assert rawCf != null;
+        int overriddenGcBefore = rawCf.deletionInfo().maxTimestamp() < maxPurgeableTimestamp ? controller.gcBefore : Integer.MIN_VALUE;
+        ColumnFamily purgedCf = ColumnFamilyStore.removeDeleted(rawCf, overriddenGcBefore);
+        emptyColumnFamily = purgedCf == null ? ArrayBackedSortedColumns.factory.create(controller.cfs.metadata) : purgedCf;
+
+        reducer = new Reducer();
+        merger = Iterators.filter(MergeIterator.get(rows, emptyColumnFamily.getComparator().onDiskAtomComparator, reducer), Predicates.notNull());
+    }
+
+    public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
+    {
+        // We should only gc tombstone if shouldPurge == true. But otherwise,
+        // it is still ok to collect column that shadowed by their (deleted)
+        // container, which removeDeleted(cf, Integer.MAX_VALUE) will do
+        ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf,
+                                                                 shouldPurge ? controller.gcBefore : Integer.MIN_VALUE,
+                                                                 controller.cfs.indexManager.updaterFor(key));
+        if (shouldPurge && compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
+            CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
+        return compacted;
     }
 
     public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
@@ -89,12 +109,12 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         try
         {
             indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
-            columnsIndex = indexBuilder.build(this);
+            columnsIndex = indexBuilder.buildForCompaction(merger);
             if (columnsIndex.columnsIndex.isEmpty())
             {
-                boolean cfIrrelevant = shouldPurge
-                                       ? ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, controller.gcBefore) == null
-                                       : !emptyColumnFamily.isMarkedForDelete(); // tombstones are relevant
+                boolean cfIrrelevant = emptyColumnFamily.deletionInfo().maxTimestamp() < maxPurgeableTimestamp
+                                     ? ColumnFamilyStore.removeDeletedCF(emptyColumnFamily, controller.gcBefore) == null
+                                     : !emptyColumnFamily.isMarkedForDelete(); // tombstones are relevant
                 if (cfIrrelevant)
                     return null;
             }
@@ -104,17 +124,16 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
             throw new RuntimeException(e);
         }
         // reach into the reducer (created during iteration) to get column count, size, max column timestamp
-        // (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
-        columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
-                                      reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
-                                      reducer == null ? maxDelTimestamp : Math.max(maxDelTimestamp, reducer.maxTimestampSeen),
-                                      reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen,
-                                      reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones,
-                                      reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.minColumnNameSeen,
-                                      reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.maxColumnNameSeen
+        columnStats = new ColumnStats(reducer.columns,
+                                      reducer.minTimestampSeen,
+                                      Math.max(emptyColumnFamily.deletionInfo().maxTimestamp(), reducer.maxTimestampSeen),
+                                      reducer.maxLocalDeletionTimeSeen,
+                                      reducer.tombstones,
+                                      reducer.minColumnNameSeen,
+                                      reducer.maxColumnNameSeen
         );
-        reducer = null;
 
+        indexBuilder.maybeWriteEmptyRowHeader();
         out.writeShort(SSTableWriter.END_OF_ROW);
 
         close();
@@ -142,24 +161,11 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
 
         // initialize indexBuilder for the benefit of its tombstoneTracker, used by our reducing iterator
         indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
-        Iterator<OnDiskAtom> iter = iterator();
-        while (iter.hasNext())
-            iter.next().updateDigest(digest);
+        while (merger.hasNext())
+            merger.next().updateDigest(digest);
         close();
     }
 
-    public AbstractType<?> getComparator()
-    {
-        return emptyColumnFamily.getComparator();
-    }
-
-    public Iterator<OnDiskAtom> iterator()
-    {
-        reducer = new Reducer();
-        Iterator<OnDiskAtom> iter = MergeIterator.get(rows, getComparator().onDiskAtomComparator, reducer);
-        return Iterators.filter(iter, Predicates.notNull());
-    }
-
     public ColumnStats columnStats()
     {
         return columnStats;
@@ -239,7 +245,8 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
             }
             else
             {
-                ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge, controller, container);
+                boolean shouldPurge = container.getSortedColumns().iterator().next().timestamp() < maxPurgeableTimestamp;
+                ColumnFamily purged = removeDeletedAndOldShards(key, shouldPurge, controller, container);
                 if (purged == null || !purged.iterator().hasNext())
                 {
                     container.clear();
@@ -248,7 +255,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
                 Column reduced = purged.iterator().next();
                 container.clear();
 
-                // PrecompactedRow.removeDeletedAndOldShards have only checked the top-level CF deletion times,
+                // removeDeletedAndOldShards have only checked the top-level CF deletion times,
                 // not the range tombstone. For that we use the columnIndexer tombstone tracker.
                 if (indexBuilder.tombstoneTracker().isDeleted(reduced))
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
deleted file mode 100644
index 8a74fea..0000000
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ /dev/null
@@ -1,403 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.compaction;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.*;
-
-import com.google.common.collect.AbstractIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
-import org.apache.cassandra.utils.*;
-
-/**
- * A class to run compaction taking advantage of multiple-core processes:
- *
- * One Deserializer thread per input sstable performs read + deserialize (a row at a time).
- * The resulting ColumnFamilies are added to a queue, which is fed to the merge Reducer.
- *
- * The merge Reducer creates MergeTasks on a thread-per-core Executor, and returns AsyncPrecompactedRow objects.
- *
- * The main complication is in handling larger-than-memory rows.  When one is encountered, no further deserialization
- * is done until that row is merged and written -- creating a pipeline stall, as it were.  Thus, this is intended
- * to be useful with mostly-in-memory row sizes, but preserves correctness in the face of occasional exceptions.
- */
-public class ParallelCompactionIterable extends AbstractCompactionIterable
-{
-    private static final Logger logger = LoggerFactory.getLogger(ParallelCompactionIterable.class);
-
-    private final int maxInMemorySize;
-
-    public ParallelCompactionIterable(OperationType type, List<ICompactionScanner> scanners, CompactionController controller)
-    {
-        this(type, scanners, controller, DatabaseDescriptor.getInMemoryCompactionLimit() / scanners.size());
-    }
-
-    public ParallelCompactionIterable(OperationType type, List<ICompactionScanner> scanners, CompactionController controller, int maxInMemorySize)
-    {
-        super(controller, type, scanners);
-        this.maxInMemorySize = maxInMemorySize;
-    }
-
-    public CloseableIterator<AbstractCompactedRow> iterator()
-    {
-        List<CloseableIterator<RowContainer>> sources = new ArrayList<CloseableIterator<RowContainer>>(scanners.size());
-        for (ICompactionScanner scanner : scanners)
-            sources.add(new Deserializer(scanner, maxInMemorySize));
-        return new Unwrapper(MergeIterator.get(sources, RowContainer.comparator, new Reducer()));
-    }
-
-    private static class Unwrapper extends AbstractIterator<AbstractCompactedRow> implements CloseableIterator<AbstractCompactedRow>
-    {
-        private final CloseableIterator<CompactedRowContainer> reducer;
-
-        public Unwrapper(CloseableIterator<CompactedRowContainer> reducer)
-        {
-            this.reducer = reducer;
-        }
-
-        protected AbstractCompactedRow computeNext()
-        {
-            if (!reducer.hasNext())
-                return endOfData();
-
-            CompactedRowContainer container = reducer.next();
-            AbstractCompactedRow compactedRow;
-            compactedRow = container.future == null
-                         ? container.row
-                         : new PrecompactedRow(container.key, FBUtilities.waitOnFuture(container.future));
-
-            return compactedRow;
-        }
-
-        public void close() throws IOException
-        {
-            reducer.close();
-        }
-    }
-
-    private class Reducer extends MergeIterator.Reducer<RowContainer, CompactedRowContainer>
-    {
-        private final List<RowContainer> rows = new ArrayList<RowContainer>();
-
-        private final ThreadPoolExecutor executor = new DebuggableThreadPoolExecutor(FBUtilities.getAvailableProcessors(),
-                                                                                     Integer.MAX_VALUE,
-                                                                                     TimeUnit.MILLISECONDS,
-                                                                                     new SynchronousQueue<Runnable>(),
-                                                                                     new NamedThreadFactory("CompactionReducer"));
-
-        public void reduce(RowContainer current)
-        {
-            rows.add(current);
-        }
-
-        protected CompactedRowContainer getReduced()
-        {
-            assert rows.size() > 0;
-
-            ParallelCompactionIterable.this.updateCounterFor(rows.size());
-            CompactedRowContainer compacted = getCompactedRow(rows);
-            rows.clear();
-            long n = 0;
-            for (ICompactionScanner scanner : scanners)
-                n += scanner.getCurrentPosition();
-            bytesRead = n;
-            return compacted;
-        }
-
-        public CompactedRowContainer getCompactedRow(List<RowContainer> rows)
-        {
-            boolean inMemory = true;
-            for (RowContainer container : rows)
-            {
-                if (container.row == null)
-                {
-                    inMemory = false;
-                    break;
-                }
-            }
-
-            if (inMemory)
-            {
-                // caller will re-use rows List, so make ourselves a copy
-                List<Row> rawRows = new ArrayList<Row>(rows.size());
-                for (RowContainer rowContainer : rows)
-                    rawRows.add(rowContainer.row);
-                return new CompactedRowContainer(rows.get(0).getKey(), executor.submit(new MergeTask(rawRows)));
-            }
-
-            List<OnDiskAtomIterator> iterators = new ArrayList<OnDiskAtomIterator>(rows.size());
-            for (RowContainer container : rows)
-                iterators.add(container.row == null ? container.wrapper : new DeserializedColumnIterator(container.row));
-            return new CompactedRowContainer(new LazilyCompactedRow(controller, iterators));
-        }
-
-        public void close()
-        {
-            executor.shutdown();
-        }
-
-        /**
-         * Merges a set of in-memory rows
-         */
-        private class MergeTask implements Callable<ColumnFamily>
-        {
-            private final List<Row> rows;
-
-            public MergeTask(List<Row> rows)
-            {
-                this.rows = rows;
-            }
-
-            public ColumnFamily call() throws Exception
-            {
-                final ColumnFamily returnCF = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata);
-
-                List<CloseableIterator<Column>> data = new ArrayList<CloseableIterator<Column>>(rows.size());
-                for (Row row : rows)
-                {
-                    returnCF.delete(row.cf);
-                    data.add(FBUtilities.closeableIterator(row.cf.iterator()));
-                }
-
-                PrecompactedRow.merge(returnCF, data, controller.cfs.indexManager.updaterFor(rows.get(0).key));
-                return PrecompactedRow.removeDeletedAndOldShards(rows.get(0).key, controller, returnCF);
-            }
-        }
-
-        private class DeserializedColumnIterator implements OnDiskAtomIterator
-        {
-            private final Row row;
-            private final Iterator<Column> iter;
-
-            public DeserializedColumnIterator(Row row)
-            {
-                this.row = row;
-                iter = row.cf.iterator();
-            }
-
-            public ColumnFamily getColumnFamily()
-            {
-                return row.cf;
-            }
-
-            public DecoratedKey getKey()
-            {
-                return row.key;
-            }
-
-            public void close() throws IOException {}
-
-            public boolean hasNext()
-            {
-                return iter.hasNext();
-            }
-
-            public OnDiskAtom next()
-            {
-                return iter.next();
-            }
-
-            public void remove()
-            {
-                throw new UnsupportedOperationException();
-            }
-        }
-    }
-
-    private static class Deserializer extends AbstractIterator<RowContainer> implements CloseableIterator<RowContainer>
-    {
-        private final LinkedBlockingQueue<RowContainer> queue = new LinkedBlockingQueue<RowContainer>(1);
-        private static final RowContainer finished = new RowContainer((Row) null);
-        private final ICompactionScanner scanner;
-
-        public Deserializer(ICompactionScanner ssts, final int maxInMemorySize)
-        {
-            this.scanner = ssts;
-            Runnable runnable = new WrappedRunnable()
-            {
-                protected void runMayThrow() throws Exception
-                {
-                    SimpleCondition condition = null;
-                    while (true)
-                    {
-                        if (condition != null)
-                        {
-                            condition.await();
-                            condition = null;
-                        }
-                        if (!scanner.hasNext())
-                        {
-                            queue.put(finished);
-                            break;
-                        }
-
-                        SSTableIdentityIterator iter = (SSTableIdentityIterator) scanner.next();
-                        if (iter.dataSize > maxInMemorySize)
-                        {
-                            logger.debug("parallel lazy deserialize from {}", iter.getPath());
-                            condition = new SimpleCondition();
-                            queue.put(new RowContainer(new NotifyingSSTableIdentityIterator(iter, condition)));
-                        }
-                        else
-                        {
-                            logger.debug("parallel eager deserialize from {}", iter.getPath());
-                            queue.put(new RowContainer(new Row(iter.getKey(), iter.getColumnFamilyWithColumns(ArrayBackedSortedColumns.factory))));
-                        }
-                    }
-                }
-            };
-            new Thread(runnable, "Deserialize " + scanner.getBackingFiles()).start();
-        }
-
-        protected RowContainer computeNext()
-        {
-            RowContainer container;
-            try
-            {
-                container = queue.take();
-            }
-            catch (InterruptedException e)
-            {
-                throw new AssertionError(e);
-            }
-            return container == finished ? endOfData() : container;
-        }
-
-        public void close() throws IOException
-        {
-            scanner.close();
-        }
-    }
-
-    /**
-     * a wrapper around SSTII that notifies the given condition when it is closed
-     */
-    private static class NotifyingSSTableIdentityIterator implements OnDiskAtomIterator
-    {
-        private final SSTableIdentityIterator wrapped;
-        private final SimpleCondition condition;
-
-        public NotifyingSSTableIdentityIterator(SSTableIdentityIterator wrapped, SimpleCondition condition)
-        {
-            this.wrapped = wrapped;
-            this.condition = condition;
-        }
-
-        public ColumnFamily getColumnFamily()
-        {
-            return wrapped.getColumnFamily();
-        }
-
-        public DecoratedKey getKey()
-        {
-            return wrapped.getKey();
-        }
-
-        public void close() throws IOException
-        {
-            try
-            {
-                wrapped.close();
-            }
-            finally
-            {
-                condition.signalAll();
-            }
-        }
-
-        public boolean hasNext()
-        {
-            return wrapped.hasNext();
-        }
-
-        public OnDiskAtom next()
-        {
-            return wrapped.next();
-        }
-
-        public void remove()
-        {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    private static class RowContainer
-    {
-        // either row is not null, or wrapper is not null.  But not both.
-        public final Row row;
-        public final NotifyingSSTableIdentityIterator wrapper;
-        public static final Comparator<RowContainer> comparator = new Comparator<RowContainer>()
-        {
-            public int compare(RowContainer o1, RowContainer o2)
-            {
-                return o1.getKey().compareTo(o2.getKey());
-            }
-        };
-
-        private RowContainer(Row row)
-        {
-            this.row = row;
-            wrapper = null;
-        }
-
-        public RowContainer(NotifyingSSTableIdentityIterator wrapper)
-        {
-            this.wrapper = wrapper;
-            row = null;
-        }
-
-        public DecoratedKey getKey()
-        {
-            return row == null ? wrapper.getKey() : row.key;
-        }
-    }
-
-    private static class CompactedRowContainer
-    {
-        public final DecoratedKey key;
-        /** either "future" or "row" will be not-null, but not both at once. */
-        public final Future<ColumnFamily> future;
-        public final LazilyCompactedRow row;
-
-        private CompactedRowContainer(DecoratedKey key, Future<ColumnFamily> future)
-        {
-            this.key = key;
-            this.future = future;
-            row = null;
-        }
-
-        private CompactedRowContainer(LazilyCompactedRow row)
-        {
-            this.row = row;
-            future = null;
-            key = null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
deleted file mode 100644
index 15ae0b8..0000000
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.compaction;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.security.MessageDigest;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.io.sstable.ColumnStats;
-import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
-import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.utils.CloseableIterator;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.MergeIterator;
-
-/**
- * PrecompactedRow merges its rows in its constructor in memory.
- */
-public class PrecompactedRow extends AbstractCompactedRow
-{
-    private final ColumnFamily compactedCf;
-
-    /** it is caller's responsibility to call removeDeleted + removeOldShards from the cf before calling this constructor */
-    public PrecompactedRow(DecoratedKey key, ColumnFamily cf)
-    {
-        super(key);
-        compactedCf = cf;
-    }
-
-    public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, CompactionController controller, ColumnFamily cf)
-    {
-        assert key != null;
-        assert controller != null;
-        assert cf != null;
-
-        // avoid calling shouldPurge unless we actually need to: it can be very expensive if LCS
-        // gets behind and has hundreds of overlapping L0 sstables.  Essentially, this method is an
-        // ugly refactor of removeDeletedAndOldShards(controller.shouldPurge(key), controller, cf),
-        // taking this into account.
-        Boolean shouldPurge = null;
-
-        if (cf.hasIrrelevantData(controller.gcBefore))
-            shouldPurge = controller.shouldPurge(key, cf.maxTimestamp());
-
-        // We should only gc tombstone if shouldPurge == true. But otherwise,
-        // it is still ok to collect column that shadowed by their (deleted)
-        // container, which removeDeleted(cf, Integer.MAX_VALUE) will do
-        ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf, shouldPurge != null && shouldPurge ? controller.gcBefore : Integer.MIN_VALUE);
-
-        if (compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
-        {
-            if (shouldPurge == null)
-                shouldPurge = controller.shouldPurge(key, cf.deletionInfo().maxTimestamp());
-            if (shouldPurge)
-                CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
-        }
-
-        return compacted;
-    }
-
-    public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
-    {
-        // See comment in preceding method
-        ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf,
-                                                                 shouldPurge ? controller.gcBefore : Integer.MIN_VALUE,
-                                                                 controller.cfs.indexManager.updaterFor(key));
-        if (shouldPurge && compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
-            CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
-        return compacted;
-    }
-
-    public PrecompactedRow(CompactionController controller, List<SSTableIdentityIterator> rows)
-    {
-        this(rows.get(0).getKey(),
-             removeDeletedAndOldShards(rows.get(0).getKey(), controller, merge(rows, controller)));
-    }
-
-    private static ColumnFamily merge(List<SSTableIdentityIterator> rows, CompactionController controller)
-    {
-        assert !rows.isEmpty();
-
-        final ColumnFamily returnCF = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata);
-
-        // transform into iterators that MergeIterator will like, and apply row-level tombstones
-        List<CloseableIterator<Column>> data = new ArrayList<>(rows.size());
-        for (SSTableIdentityIterator row : rows)
-        {
-            ColumnFamily cf = row.getColumnFamilyWithColumns(ArrayBackedSortedColumns.factory);
-            returnCF.delete(cf);
-            data.add(FBUtilities.closeableIterator(cf.iterator()));
-        }
-
-        merge(returnCF, data, controller.cfs.indexManager.updaterFor(rows.get(0).getKey()));
-
-        return returnCF;
-    }
-
-    // returnCF should already have row-level tombstones applied
-    public static void merge(final ColumnFamily returnCF, List<CloseableIterator<Column>> data, final SecondaryIndexManager.Updater indexer)
-    {
-        IDiskAtomFilter filter = new IdentityQueryFilter();
-        Comparator<Column> fcomp = filter.getColumnComparator(returnCF.getComparator());
-
-        MergeIterator.Reducer<Column, Column> reducer = new MergeIterator.Reducer<Column, Column>()
-        {
-            ColumnFamily container = returnCF.cloneMeShallow();
-
-            public void reduce(Column column)
-            {
-                container.addColumn(column);
-
-                // skip the index-update checks if there is no indexing needed since they are a bit expensive
-                if (indexer == SecondaryIndexManager.nullUpdater)
-                    return;
-
-                // notify the index that the column has been overwritten if the value being reduced has been
-                // superceded by another directly, or indirectly by a range tombstone
-                if ((!column.isMarkedForDelete(System.currentTimeMillis()) && !container.getColumn(column.name()).equals(column))
-                    || returnCF.deletionInfo().isDeleted(column.name(), CompactionManager.NO_GC))
-                {
-                    indexer.remove(column);
-                }
-            }
-
-            protected Column getReduced()
-            {
-                Column c = container.iterator().next();
-                container.clear();
-                return c;
-            }
-        };
-
-        Iterator<Column> reduced = MergeIterator.get(data, fcomp, reducer);
-        filter.collectReducedColumns(returnCF, reduced, CompactionManager.NO_GC, System.currentTimeMillis());
-    }
-
-    public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
-    {
-        if (compactedCf == null)
-            return null;
-
-        return SSTableWriter.rawAppend(compactedCf, currentPosition, key, out);
-    }
-
-    public void update(MessageDigest digest)
-    {
-        assert compactedCf != null;
-        DataOutputBuffer buffer = new DataOutputBuffer();
-        try
-        {
-            DeletionTime.serializer.serialize(compactedCf.deletionInfo().getTopLevelDeletion(), buffer);
-            digest.update(buffer.getData(), 0, buffer.getLength());
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        compactedCf.updateDigest(digest);
-    }
-
-    public ColumnStats columnStats()
-    {
-        return compactedCf.getColumnStats();
-    }
-
-    /**
-     * @return the full column family represented by this compacted row.
-     *
-     * We do not provide this method for other AbstractCompactedRow, because this fits the whole row into
-     * memory and don't make sense for those other implementations.
-     */
-    public ColumnFamily getFullColumnFamily()
-    {
-        return compactedCf;
-    }
-
-    public void close() { }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
index d0aafa4..a14ab43 100644
--- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@ -96,9 +96,9 @@ public class SSTableSplitter {
         }
 
         @Override
-        public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp)
+        public long maxPurgeableTimestamp(DecoratedKey key)
         {
-            return false;
+            return Long.MIN_VALUE;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index e435c24..97daff4 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -54,14 +54,14 @@ public class Scrubber implements Closeable
 
     private final OutputHandler outputHandler;
 
-    private static final Comparator<AbstractCompactedRow> acrComparator = new Comparator<AbstractCompactedRow>()
+    private static final Comparator<Row> rowComparator = new Comparator<Row>()
     {
-         public int compare(AbstractCompactedRow r1, AbstractCompactedRow r2)
+         public int compare(Row r1, Row r2)
          {
              return r1.key.compareTo(r2.key);
          }
     };
-    private final Set<AbstractCompactedRow> outOfOrderRows = new TreeSet<AbstractCompactedRow>(acrComparator);
+    private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator);
 
     public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException
     {
@@ -100,7 +100,7 @@ public class Scrubber implements Closeable
 
     public void scrub()
     {
-        outputHandler.output("Scrubbing " + sstable);
+        outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
         try
         {
             ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
@@ -113,7 +113,7 @@ public class Scrubber implements Closeable
             // TODO errors when creating the writer may leave empty temp files.
             writer = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
 
-            AbstractCompactedRow prevRow = null;
+            DecoratedKey prevKey = null;
 
             while (!dataFile.isEOF())
             {
@@ -184,20 +184,19 @@ public class Scrubber implements Closeable
                     if (dataSize > dataFile.length())
                         throw new IOError(new IOException("Impossible row size " + dataSize));
 
-                    SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
-                    AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
-                    if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
+                    SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
+                    if (prevKey != null && prevKey.compareTo(key) > 0)
                     {
-                        outOfOrderRows.add(compactedRow);
-                        outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+                        saveOutOfOrderRow(prevKey, key, atoms);
                         continue;
                     }
 
+                    AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
                     if (writer.append(compactedRow) == null)
                         emptyRows++;
                     else
                         goodRows++;
-                    prevRow = compactedRow;
+                    prevKey = key;
                     if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
                         outputHandler.warn("Index file contained a different key or row size; using key from data file");
                 }
@@ -215,19 +214,19 @@ public class Scrubber implements Closeable
                         key = sstable.partitioner.decorateKey(currentIndexKey);
                         try
                         {
-                            SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSizeFromIndex, true);
-                            AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
-                            if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
+                            SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
+                            if (prevKey != null && prevKey.compareTo(key) > 0)
                             {
-                                outOfOrderRows.add(compactedRow);
-                                outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+                                saveOutOfOrderRow(prevKey, key, atoms);
                                 continue;
                             }
+
+                            AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
                             if (writer.append(compactedRow) == null)
                                 emptyRows++;
                             else
                                 goodRows++;
-                            prevRow = compactedRow;
+                            prevKey = key;
                         }
                         catch (Throwable th2)
                         {
@@ -273,8 +272,8 @@ public class Scrubber implements Closeable
         if (!outOfOrderRows.isEmpty())
         {
             SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
-            for (AbstractCompactedRow row : outOfOrderRows)
-                inOrderWriter.append(row);
+            for (Row row : outOfOrderRows)
+                inOrderWriter.append(row.key, row.cf);
             newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
             outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable));
         }
@@ -294,6 +293,21 @@ public class Scrubber implements Closeable
         }
     }
 
+    private void saveOutOfOrderRow(DecoratedKey prevKey, DecoratedKey key, SSTableIdentityIterator atoms)
+    {
+        // TODO bitch if the row is too large?  if it is there's not much we can do ...
+        outputHandler.warn(String.format("Out of order row detected (%s found after %s)", key, prevKey));
+        // adding atoms in sorted order is worst-case for TMBSC, but we shouldn't need to do this very often
+        // and there's no sense in failing on mis-sorted cells when a TreeMap could safe us
+        ColumnFamily cf = atoms.getColumnFamily().cloneMeShallow(TreeMapBackedSortedColumns.factory, false);
+        while (atoms.hasNext())
+        {
+            OnDiskAtom atom = atoms.next();
+            cf.addAtom(atom);
+        }
+        outOfOrderRows.add(new Row(key, cf));
+    }
+
     public SSTableReader getNewSSTable()
     {
         return newSstable;
@@ -356,9 +370,9 @@ public class Scrubber implements Closeable
         }
 
         @Override
-        public boolean shouldPurge(DecoratedKey key, long delTimestamp)
+        public long maxPurgeableTimestamp(DecoratedKey key)
         {
-            return false;
+            return Long.MIN_VALUE;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index fa21765..b98c2ae 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -151,9 +151,9 @@ public class Upgrader
         }
 
         @Override
-        public boolean shouldPurge(DecoratedKey key, long delTimestamp)
+        public long maxPurgeableTimestamp(DecoratedKey key)
         {
-            return false;
+            return Long.MIN_VALUE;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index 1fa2912..e7b9e11 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -71,7 +71,7 @@ public class IndexSummaryBuilder
 
     public IndexSummary build(IPartitioner partitioner)
     {
-        assert keys != null && keys.size() > 0;
+        assert keys.size() > 0;
         assert keys.size() == positions.size();
 
         Memory memory = Memory.allocate(offheapSize + (keys.size() * 4));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index ac598bd..70c0b42 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -259,7 +259,7 @@ public class SSTableWriter extends SSTable
                 columnIndexer.add(atom); // This write the atom on disk too
             }
 
-            columnIndexer.finish();
+            columnIndexer.maybeWriteEmptyRowHeader();
             dataFile.stream.writeShort(END_OF_ROW);
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/test/data/serialization/2.0/db.RowMutation.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/db.RowMutation.bin b/test/data/serialization/2.0/db.RowMutation.bin
index d6abf2d..73d93e8 100644
Binary files a/test/data/serialization/2.0/db.RowMutation.bin and b/test/data/serialization/2.0/db.RowMutation.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
index 48c0b3c..18e637b 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
@@ -39,6 +39,8 @@ import org.apache.cassandra.Util;
 
 import static org.junit.Assert.assertEquals;
 import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
+import static org.junit.Assert.assertFalse;
+
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 
@@ -144,45 +146,50 @@ public class CompactionsPurgeTest extends SchemaLoader
         // verify that minor compaction still GC when key is present
         // in a non-compacted sstable but the timestamp ensures we won't miss anything
         cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key1, cfName, System.currentTimeMillis()));
-        Assert.assertEquals(1, cf.getColumnCount());
+        assertEquals(1, cf.getColumnCount());
     }
 
+    /**
+     * verify that we don't drop tombstones during a minor compaction that might still be relevant
+     */
     @Test
     public void testMinTimestampPurge() throws IOException, ExecutionException, InterruptedException
     {
-        // verify that we don't drop tombstones during a minor compaction that might still be relevant
         CompactionManager.instance.disableAutoCompaction();
+
         Keyspace keyspace = Keyspace.open(KEYSPACE2);
         String cfName = "Standard1";
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
-
         RowMutation rm;
         DecoratedKey key3 = Util.dk("key3");
+
         // inserts
         rm = new RowMutation(KEYSPACE2, key3.key);
         rm.add(cfName, ByteBufferUtil.bytes("c1"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 8);
         rm.add(cfName, ByteBufferUtil.bytes("c2"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 8);
         rm.apply();
         cfs.forceBlockingFlush();
-        // deletes
+        // delete c1
         rm = new RowMutation(KEYSPACE2, key3.key);
         rm.delete(cfName, ByteBufferUtil.bytes("c1"), 10);
         rm.apply();
         cfs.forceBlockingFlush();
         Collection<SSTableReader> sstablesIncomplete = cfs.getSSTables();
 
-        // delete so we have new delete in a diffrent SST.
+        // delete c2 so we have new delete in a diffrent SSTable
         rm = new RowMutation(KEYSPACE2, key3.key);
         rm.delete(cfName, ByteBufferUtil.bytes("c2"), 9);
         rm.apply();
         cfs.forceBlockingFlush();
+
+        // compact the sstables with the c1/c2 data and the c1 tombstone
         cfs.getCompactionStrategy().getUserDefinedTask(sstablesIncomplete, Integer.MAX_VALUE).execute(null);
 
-        // we should have both the c1 and c2 tombstones still, since the c2 timestamp is older than the c1 tombstone
-        // so it would be invalid to assume we can throw out the c1 entry.
+        // We should have both the c1 and c2 tombstones still. Since the min timestamp in the c2 tombstone
+        // sstable is older than the c1 tombstone, it is invalid to throw out the c1 tombstone.
         ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key3, cfName, System.currentTimeMillis()));
-        Assert.assertFalse(cf.getColumn(ByteBufferUtil.bytes("c2")).isLive(System.currentTimeMillis()));
-        Assert.assertEquals(2, cf.getColumnCount());
+        assertFalse(cf.getColumn(ByteBufferUtil.bytes("c2")).isLive(System.currentTimeMillis()));
+        assertEquals(2, cf.getColumnCount());
     }
 
     @Test
@@ -279,16 +286,13 @@ public class CompactionsPurgeTest extends SchemaLoader
         String cfName = "Standard1";
         Keyspace keyspace = Keyspace.open(keyspaceName);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
-
         DecoratedKey key = Util.dk("key3");
         RowMutation rm;
 
         // inserts
         rm = new RowMutation(keyspaceName, key.key);
         for (int i = 0; i < 10; i++)
-        {
             rm.add(cfName, ByteBufferUtil.bytes(String.valueOf(i)), ByteBufferUtil.EMPTY_BYTE_BUFFER, i);
-        }
         rm.apply();
 
         // deletes row with timestamp such that not all columns are deleted
@@ -303,12 +307,10 @@ public class CompactionsPurgeTest extends SchemaLoader
         // re-inserts with timestamp lower than delete
         rm = new RowMutation(keyspaceName, key.key);
         for (int i = 0; i < 5; i++)
-        {
             rm.add(cfName, ByteBufferUtil.bytes(String.valueOf(i)), ByteBufferUtil.EMPTY_BYTE_BUFFER, i);
-        }
         rm.apply();
 
-        // Check that the second insert did went in
+        // Check that the second insert went in
         ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
         assertEquals(10, cf.getColumnCount());
         for (Column c : cf)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
deleted file mode 100644
index 1c286c8..0000000
--- a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.io;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-
-import com.google.common.base.Objects;
-import org.junit.Test;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.compaction.*;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.MappedFileDataInput;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.CloseableIterator;
-
-import static org.junit.Assert.assertEquals;
-
-
-public class LazilyCompactedRowTest extends SchemaLoader
-{
-    private static void assertBytes(ColumnFamilyStore cfs, int gcBefore) throws IOException
-    {
-        AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
-        Collection<SSTableReader> sstables = cfs.getSSTables();
-
-        // compare eager and lazy compactions
-        AbstractCompactionIterable eager = new CompactionIterable(OperationType.UNKNOWN,
-                                                                  strategy.getScanners(sstables),
-                                                                  new PreCompactingController(cfs, sstables, gcBefore));
-        AbstractCompactionIterable lazy = new CompactionIterable(OperationType.UNKNOWN,
-                                                                 strategy.getScanners(sstables),
-                                                                 new LazilyCompactingController(cfs, sstables, gcBefore));
-        assertBytes(cfs, eager, lazy);
-
-        // compare eager and parallel-lazy compactions
-        eager = new CompactionIterable(OperationType.UNKNOWN,
-                                       strategy.getScanners(sstables),
-                                       new PreCompactingController(cfs, sstables, gcBefore));
-        AbstractCompactionIterable parallel = new ParallelCompactionIterable(OperationType.UNKNOWN,
-                                                                             strategy.getScanners(sstables),
-                                                                             new CompactionController(cfs, new HashSet<SSTableReader>(sstables), gcBefore),
-                                                                             0);
-        assertBytes(cfs, eager, parallel);
-    }
-
-    private static void assertBytes(ColumnFamilyStore cfs, AbstractCompactionIterable ci1, AbstractCompactionIterable ci2) throws IOException
-    {
-        CloseableIterator<AbstractCompactedRow> iter1 = ci1.iterator();
-        CloseableIterator<AbstractCompactedRow> iter2 = ci2.iterator();
-
-        while (true)
-        {
-            if (!iter1.hasNext())
-            {
-                assert !iter2.hasNext();
-                break;
-            }
-
-            AbstractCompactedRow row1 = iter1.next();
-            AbstractCompactedRow row2 = iter2.next();
-            DataOutputBuffer out1 = new DataOutputBuffer();
-            DataOutputBuffer out2 = new DataOutputBuffer();
-            row1.write(-1, out1);
-            row2.write(-1, out2);
-
-            File tmpFile1 = File.createTempFile("lcrt1", null);
-            File tmpFile2 = File.createTempFile("lcrt2", null);
-
-            tmpFile1.deleteOnExit();
-            tmpFile2.deleteOnExit();
-
-            try (OutputStream os1 = new FileOutputStream(tmpFile1);
-                 OutputStream os2 = new FileOutputStream(tmpFile2))
-            {
-                os1.write(out1.getData()); // writing data from row1
-                os2.write(out2.getData()); // writing data from row2
-            }
-
-            try (MappedFileDataInput in1 = new MappedFileDataInput(new FileInputStream(tmpFile1), tmpFile1.getAbsolutePath(), 0, 0);
-                 MappedFileDataInput in2 = new MappedFileDataInput(new FileInputStream(tmpFile2), tmpFile2.getAbsolutePath(), 0, 0))
-            {
-                // row key
-                assertEquals(ByteBufferUtil.readWithShortLength(in1), ByteBufferUtil.readWithShortLength(in2));
-    
-                // cf metadata
-                ColumnFamily cf1 = TreeMapBackedSortedColumns.factory.create(cfs.metadata);
-                ColumnFamily cf2 = TreeMapBackedSortedColumns.factory.create(cfs.metadata);
-                cf1.delete(DeletionTime.serializer.deserialize(in1));
-                cf2.delete(DeletionTime.serializer.deserialize(in2));
-                assertEquals(cf1.deletionInfo(), cf2.deletionInfo());
-                // columns
-                while (true)
-                {
-                    Column c1 = (Column)Column.onDiskSerializer().deserializeFromSSTable(in1, Descriptor.Version.CURRENT);
-                    Column c2 = (Column)Column.onDiskSerializer().deserializeFromSSTable(in2, Descriptor.Version.CURRENT);
-                    assert Objects.equal(c1, c2) : c1.getString(cfs.metadata.comparator) + " != " + c2.getString(cfs.metadata.comparator);
-                    if (c1 == null)
-                        break;
-                }
-                // that should be everything
-                assert in1.available() == 0;
-                assert in2.available() == 0;
-            }
-        }
-    }
-
-    private void assertDigest(ColumnFamilyStore cfs, int gcBefore) throws NoSuchAlgorithmException
-    {
-        AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
-        Collection<SSTableReader> sstables = cfs.getSSTables();
-        AbstractCompactionIterable ci1 = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new PreCompactingController(cfs, sstables, gcBefore));
-        AbstractCompactionIterable ci2 = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new LazilyCompactingController(cfs, sstables, gcBefore));
-        CloseableIterator<AbstractCompactedRow> iter1 = ci1.iterator();
-        CloseableIterator<AbstractCompactedRow> iter2 = ci2.iterator();
-
-        while (true)
-        {
-            if (!iter1.hasNext())
-            {
-                assert !iter2.hasNext();
-                break;
-            }
-
-            AbstractCompactedRow row1 = iter1.next();
-            AbstractCompactedRow row2 = iter2.next();
-            MessageDigest digest1 = MessageDigest.getInstance("MD5");
-            MessageDigest digest2 = MessageDigest.getInstance("MD5");
-
-            row1.update(digest1);
-            row2.update(digest2);
-
-            assert MessageDigest.isEqual(digest1.digest(), digest2.digest());
-        }
-    }
-
-    @Test
-    public void testOneRow() throws IOException, NoSuchAlgorithmException
-    {
-        CompactionManager.instance.disableAutoCompaction();
-
-        Keyspace keyspace = Keyspace.open("Keyspace1");
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
-
-        ByteBuffer key = ByteBufferUtil.bytes("k");
-        RowMutation rm = new RowMutation("Keyspace1", key);
-        rm.add("Standard1", ByteBufferUtil.bytes("c"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
-        rm.apply();
-        cfs.forceBlockingFlush();
-
-        assertBytes(cfs, Integer.MAX_VALUE);
-        assertDigest(cfs, Integer.MAX_VALUE);
-    }
-
-    @Test
-    public void testOneRowTwoColumns() throws IOException, NoSuchAlgorithmException
-    {
-        CompactionManager.instance.disableAutoCompaction();
-
-        Keyspace keyspace = Keyspace.open("Keyspace1");
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
-
-        ByteBuffer key = ByteBufferUtil.bytes("k");
-        RowMutation rm = new RowMutation("Keyspace1", key);
-        rm.add("Standard1", ByteBufferUtil.bytes("c"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
-        rm.add("Standard1", ByteBufferUtil.bytes("d"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
-        rm.apply();
-        cfs.forceBlockingFlush();
-
-        assertBytes(cfs, Integer.MAX_VALUE);
-        assertDigest(cfs, Integer.MAX_VALUE);
-    }
-
-    @Test
-    public void testOneRowManyColumns() throws IOException, NoSuchAlgorithmException
-    {
-        CompactionManager.instance.disableAutoCompaction();
-
-        Keyspace keyspace = Keyspace.open("Keyspace1");
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
-
-        ByteBuffer key = ByteBuffer.wrap("k".getBytes());
-        RowMutation rm = new RowMutation("Keyspace1", key);
-        for (int i = 0; i < 1000; i++)
-            rm.add("Standard1", ByteBufferUtil.bytes(i), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
-        rm.apply();
-        DataOutputBuffer out = new DataOutputBuffer();
-        RowMutation.serializer.serialize(rm, out, MessagingService.current_version);
-        assert out.getLength() > DatabaseDescriptor.getColumnIndexSize();
-        cfs.forceBlockingFlush();
-
-        assertBytes(cfs, Integer.MAX_VALUE);
-        assertDigest(cfs, Integer.MAX_VALUE);
-    }
-
-    @Test
-    public void testTwoRows() throws IOException, NoSuchAlgorithmException
-    {
-        CompactionManager.instance.disableAutoCompaction();
-
-        Keyspace keyspace = Keyspace.open("Keyspace1");
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
-
-        ByteBuffer key = ByteBufferUtil.bytes("k");
-        RowMutation rm = new RowMutation("Keyspace1", key);
-        rm.add("Standard1", ByteBufferUtil.bytes("c"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
-        rm.apply();
-        cfs.forceBlockingFlush();
-
-        rm.apply();
-        cfs.forceBlockingFlush();
-
-        assertBytes(cfs, Integer.MAX_VALUE);
-        assertDigest(cfs, Integer.MAX_VALUE);
-    }
-
-    @Test
-    public void testTwoRowsTwoColumns() throws IOException, NoSuchAlgorithmException
-    {
-        CompactionManager.instance.disableAutoCompaction();
-
-        Keyspace keyspace = Keyspace.open("Keyspace1");
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
-
-        ByteBuffer key = ByteBufferUtil.bytes("k");
-        RowMutation rm = new RowMutation("Keyspace1", key);
-        rm.add("Standard1", ByteBufferUtil.bytes("c"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
-        rm.add("Standard1", ByteBufferUtil.bytes("d"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
-        rm.apply();
-        cfs.forceBlockingFlush();
-
-        rm.apply();
-        cfs.forceBlockingFlush();
-
-        assertBytes(cfs, Integer.MAX_VALUE);
-        assertDigest(cfs, Integer.MAX_VALUE);
-    }
-
-    @Test
-    public void testManyRows() throws IOException, NoSuchAlgorithmException
-    {
-        CompactionManager.instance.disableAutoCompaction();
-
-        Keyspace keyspace = Keyspace.open("Keyspace1");
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
-
-        final int ROWS_PER_SSTABLE = 10;
-        for (int j = 0; j < (cfs.metadata.getIndexInterval() * 3) / ROWS_PER_SSTABLE; j++)
-        {
-            for (int i = 0; i < ROWS_PER_SSTABLE; i++)
-            {
-                ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(i % 2));
-                RowMutation rm = new RowMutation("Keyspace1", key);
-                rm.add("Standard1", ByteBufferUtil.bytes(String.valueOf(i / 2)), ByteBufferUtil.EMPTY_BYTE_BUFFER, j * ROWS_PER_SSTABLE + i);
-                rm.apply();
-            }
-            cfs.forceBlockingFlush();
-        }
-
-        assertBytes(cfs, Integer.MAX_VALUE);
-        assertDigest(cfs, Integer.MAX_VALUE);
-    }
-
-    private static class LazilyCompactingController extends CompactionController
-    {
-        public LazilyCompactingController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore)
-        {
-            super(cfs, new HashSet<SSTableReader>(sstables), gcBefore);
-        }
-
-        @Override
-        public AbstractCompactedRow getCompactedRow(List<SSTableIdentityIterator> rows)
-        {
-            return new LazilyCompactedRow(this, rows);
-        }
-    }
-
-    private static class PreCompactingController extends CompactionController
-    {
-        public PreCompactingController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore)
-        {
-            super(cfs, new HashSet<SSTableReader>(sstables), gcBefore);
-        }
-
-        @Override
-        public AbstractCompactedRow getCompactedRow(List<SSTableIdentityIterator> rows)
-        {
-            return new PrecompactedRow(this, rows);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36cdf34b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index 9fa5d89..757254c 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -17,7 +17,10 @@
  */
 package org.apache.cassandra.repair;
 
+import java.io.DataOutput;
+import java.io.IOException;
 import java.net.InetAddress;
+import java.security.MessageDigest;
 import java.util.UUID;
 
 import org.junit.After;
@@ -27,11 +30,12 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.TreeMapBackedSortedColumns;
-import org.apache.cassandra.db.compaction.PrecompactedRow;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.ColumnStats;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -43,11 +47,7 @@ import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.SimpleCondition;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 public class ValidatorTest extends SchemaLoader
 {
@@ -109,8 +109,7 @@ public class ValidatorTest extends SchemaLoader
 
         // add a row
         Token mid = partitioner.midpoint(range.left, range.right);
-        validator.add(new PrecompactedRow(new DecoratedKey(mid, ByteBufferUtil.bytes("inconceivable!")),
-                                                 TreeMapBackedSortedColumns.factory.create(cfs.metadata)));
+        validator.add(new CompactedRowStub(new DecoratedKey(mid, ByteBufferUtil.bytes("inconceivable!"))));
         validator.complete();
 
         // confirm that the tree was validated
@@ -121,6 +120,28 @@ public class ValidatorTest extends SchemaLoader
             lock.await();
     }
 
+    private static class CompactedRowStub extends AbstractCompactedRow
+    {
+        private CompactedRowStub(DecoratedKey key)
+        {
+            super(key);
+        }
+
+        public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public void update(MessageDigest digest) { }
+
+        public ColumnStats columnStats()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public void close() throws IOException { }
+    }
+
     @Test
     public void testValidatorFailed() throws Throwable
     {


[06/10] git commit: Fixes for compacting larger-than-memory rows patch by jbellis; reviewed by marcuse for CASSANDRA-6274

Posted by jb...@apache.org.
Fixes for compacting larger-than-memory rows
patch by jbellis; reviewed by marcuse for CASSANDRA-6274


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d4b5b0db
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d4b5b0db
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d4b5b0db

Branch: refs/heads/cassandra-2.0
Commit: d4b5b0dbc541b4b6249ccd1b507c777d7fc0bc4f
Parents: 7187a8a
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Oct 30 10:14:15 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Oct 30 10:14:15 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/ColumnIndex.java    |  22 ++++----
 .../db/compaction/AbstractCompactedRow.java     |   4 +-
 .../db/compaction/CompactionController.java     |   2 +-
 .../db/compaction/LazilyCompactedRow.java       |  34 ++++++++----
 .../cassandra/db/compaction/Scrubber.java       |  52 ++++++++++++-------
 .../cassandra/io/sstable/SSTableWriter.java     |   2 +-
 test/data/serialization/2.0/db.RowMutation.bin  | Bin 3599 -> 3599 bytes
 8 files changed, 74 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e73a2b5..31b944c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.3
+ * Fixes for compacting larger-than-memory rows (CASSANDRA-6274)
  * Compact hottest sstables first and optionally omit coldest from
    compaction entirely (CASSANDRA-6109)
  * Fix modifying column_metadata from thrift (CASSANDRA-6182)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index a5e0447..75a06d7 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -140,20 +140,22 @@ public class ColumnIndex
             }
             ColumnIndex index = build();
 
-            finish();
+            maybeWriteEmptyRowHeader();
 
             return index;
         }
 
-        public ColumnIndex build(Iterable<OnDiskAtom> columns) throws IOException
+        /**
+         * The important distinction wrt build() is that we may be building for a row that ends up
+         * being compacted away entirely, i.e., the input consists only of expired tombstones (or
+         * columns shadowed by expired tombstone).  Thus, it is the caller's responsibility
+         * to decide whether to write the header for an empty row.
+         */
+        public ColumnIndex buildForCompaction(Iterator<OnDiskAtom> columns) throws IOException
         {
-            for (OnDiskAtom c : columns)
-                add(c);
-            ColumnIndex index = build();
-
-            finish();
-
-            return index;
+            while (columns.hasNext())
+                add(columns.next());
+            return build();
         }
 
         public void add(OnDiskAtom column) throws IOException
@@ -219,7 +221,7 @@ public class ColumnIndex
             return result;
         }
 
-        public void finish() throws IOException
+        public void maybeWriteEmptyRowHeader() throws IOException
         {
             if (!deletionInfo.isLive())
                 maybeWriteRowHeader();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
index 966770f..734155e 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
@@ -42,10 +42,10 @@ public abstract class AbstractCompactedRow implements Closeable
 
     /**
      * write the row (size + column index + filter + column data, but NOT row key) to @param out.
-     * It is an error to call this if isEmpty is false.  (Because the key is appended first,
-     * so we'd have an incomplete row written.)
      *
      * write() may change internal state; it is NOT valid to call write() or update() a second time.
+     *
+     * @return index information for the written row, or null if the compaction resulted in only expired tombstones.
      */
     public abstract RowIndexEntry write(long currentPosition, DataOutput out) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 9552895..65515d6 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -154,7 +154,7 @@ public class CompactionController
 
     /**
      * @return true if it's okay to drop tombstones for the given row, i.e., if we know all the verisons of the row
-     * are included in the compaction set
+     * older than @param maxDeletionTimestamp are included in the compaction set
      */
     public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 7d7c5a4..0cdbbb7 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -52,13 +52,13 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
     private final List<? extends OnDiskAtomIterator> rows;
     private final CompactionController controller;
     private final boolean shouldPurge;
-    private ColumnFamily emptyColumnFamily;
+    private final ColumnFamily emptyColumnFamily;
     private Reducer reducer;
     private ColumnStats columnStats;
     private boolean closed;
     private ColumnIndex.Builder indexBuilder;
     private final SecondaryIndexManager.Updater indexer;
-    private long maxDelTimestamp;
+    private long maxTombstoneTimestamp;
 
     public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows)
     {
@@ -67,18 +67,31 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         this.controller = controller;
         indexer = controller.cfs.indexManager.updaterFor(key);
 
-        maxDelTimestamp = Long.MIN_VALUE;
+        ColumnFamily rawCf = null;
+        maxTombstoneTimestamp = Long.MIN_VALUE;
         for (OnDiskAtomIterator row : rows)
         {
             ColumnFamily cf = row.getColumnFamily();
-            maxDelTimestamp = Math.max(maxDelTimestamp, cf.deletionInfo().maxTimestamp());
+            maxTombstoneTimestamp = Math.max(maxTombstoneTimestamp, cf.deletionInfo().maxTimestamp());
 
-            if (emptyColumnFamily == null)
-                emptyColumnFamily = cf;
+            if (rawCf == null)
+                rawCf = cf;
             else
-                emptyColumnFamily.delete(cf);
+                rawCf.delete(cf);
         }
-        this.shouldPurge = controller.shouldPurge(key, maxDelTimestamp);
+
+        // Don't pass maxTombstoneTimestamp to shouldPurge since we might well have cells with
+        // tombstones newer than the row-level tombstones we've seen -- but we won't know that
+        // until we iterate over them.  By passing MAX_VALUE we will only purge if there are
+        // no other versions of this row present.
+        this.shouldPurge = controller.shouldPurge(key, Long.MAX_VALUE);
+
+        // even if we can't delete all the tombstones allowed by gcBefore, we should still call removeDeleted
+        // to get rid of redundant row-level and range tombstones
+        assert rawCf != null;
+        int overriddenGcBefore = shouldPurge ? controller.gcBefore : Integer.MIN_VALUE;
+        ColumnFamily purgedCf = ColumnFamilyStore.removeDeleted(rawCf, overriddenGcBefore);
+        emptyColumnFamily = purgedCf == null ? ArrayBackedSortedColumns.factory.create(controller.cfs.metadata) : purgedCf;
     }
 
     public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException
@@ -89,7 +102,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         try
         {
             indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out);
-            columnsIndex = indexBuilder.build(this);
+            columnsIndex = indexBuilder.buildForCompaction(iterator());
             if (columnsIndex.columnsIndex.isEmpty())
             {
                 boolean cfIrrelevant = shouldPurge
@@ -107,7 +120,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         // (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null)
         columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns,
                                       reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen,
-                                      reducer == null ? maxDelTimestamp : Math.max(maxDelTimestamp, reducer.maxTimestampSeen),
+                                      reducer == null ? maxTombstoneTimestamp : Math.max(maxTombstoneTimestamp, reducer.maxTimestampSeen),
                                       reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen,
                                       reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones,
                                       reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.minColumnNameSeen,
@@ -115,6 +128,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
         );
         reducer = null;
 
+        indexBuilder.maybeWriteEmptyRowHeader();
         out.writeShort(SSTableWriter.END_OF_ROW);
 
         close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index d8a2745..708e929 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -54,14 +54,14 @@ public class Scrubber implements Closeable
 
     private final OutputHandler outputHandler;
 
-    private static final Comparator<AbstractCompactedRow> acrComparator = new Comparator<AbstractCompactedRow>()
+    private static final Comparator<Row> rowComparator = new Comparator<Row>()
     {
-         public int compare(AbstractCompactedRow r1, AbstractCompactedRow r2)
+         public int compare(Row r1, Row r2)
          {
              return r1.key.compareTo(r2.key);
          }
     };
-    private final Set<AbstractCompactedRow> outOfOrderRows = new TreeSet<AbstractCompactedRow>(acrComparator);
+    private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator);
 
     public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException
     {
@@ -100,7 +100,7 @@ public class Scrubber implements Closeable
 
     public void scrub()
     {
-        outputHandler.output("Scrubbing " + sstable);
+        outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
         try
         {
             ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
@@ -113,7 +113,7 @@ public class Scrubber implements Closeable
             // TODO errors when creating the writer may leave empty temp files.
             writer = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
 
-            AbstractCompactedRow prevRow = null;
+            DecoratedKey prevKey = null;
 
             while (!dataFile.isEOF())
             {
@@ -184,20 +184,19 @@ public class Scrubber implements Closeable
                     if (dataSize > dataFile.length())
                         throw new IOError(new IOException("Impossible row size " + dataSize));
 
-                    SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
-                    AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
-                    if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
+                    SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
+                    if (prevKey != null && prevKey.compareTo(key) > 0)
                     {
-                        outOfOrderRows.add(compactedRow);
-                        outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+                        saveOutOfOrderRow(prevKey, key, atoms);
                         continue;
                     }
 
+                    AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
                     if (writer.append(compactedRow) == null)
                         emptyRows++;
                     else
                         goodRows++;
-                    prevRow = compactedRow;
+                    prevKey = key;
                     if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
                         outputHandler.warn("Index file contained a different key or row size; using key from data file");
                 }
@@ -215,19 +214,19 @@ public class Scrubber implements Closeable
                         key = sstable.partitioner.decorateKey(currentIndexKey);
                         try
                         {
-                            SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSizeFromIndex, true);
-                            AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
-                            if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0)
+                            SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true);
+                            if (prevKey != null && prevKey.compareTo(key) > 0)
                             {
-                                outOfOrderRows.add(compactedRow);
-                                outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+                                saveOutOfOrderRow(prevKey, key, atoms);
                                 continue;
                             }
+
+                            AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
                             if (writer.append(compactedRow) == null)
                                 emptyRows++;
                             else
                                 goodRows++;
-                            prevRow = compactedRow;
+                            prevKey = key;
                         }
                         catch (Throwable th2)
                         {
@@ -273,8 +272,8 @@ public class Scrubber implements Closeable
         if (!outOfOrderRows.isEmpty())
         {
             SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable);
-            for (AbstractCompactedRow row : outOfOrderRows)
-                inOrderWriter.append(row);
+            for (Row row : outOfOrderRows)
+                inOrderWriter.append(row.key, row.cf);
             newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
             outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable));
         }
@@ -294,6 +293,21 @@ public class Scrubber implements Closeable
         }
     }
 
+    private void saveOutOfOrderRow(DecoratedKey prevKey, DecoratedKey key, SSTableIdentityIterator atoms)
+    {
+        // TODO bitch if the row is too large?  if it is there's not much we can do ...
+        outputHandler.warn(String.format("Out of order row detected (%s found after %s)", key, prevKey));
+        // adding atoms in sorted order is worst-case for TMBSC, but we shouldn't need to do this very often
+        // and there's no sense in failing on mis-sorted cells when a TreeMap could safe us
+        ColumnFamily cf = atoms.getColumnFamily().cloneMeShallow(TreeMapBackedSortedColumns.factory, false);
+        while (atoms.hasNext())
+        {
+            OnDiskAtom atom = atoms.next();
+            cf.addAtom(atom);
+        }
+        outOfOrderRows.add(new Row(key, cf));
+    }
+
     public SSTableReader getNewSSTable()
     {
         return newSstable;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index ac598bd..70c0b42 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -259,7 +259,7 @@ public class SSTableWriter extends SSTable
                 columnIndexer.add(atom); // This write the atom on disk too
             }
 
-            columnIndexer.finish();
+            columnIndexer.maybeWriteEmptyRowHeader();
             dataFile.stream.writeShort(END_OF_ROW);
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/test/data/serialization/2.0/db.RowMutation.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/db.RowMutation.bin b/test/data/serialization/2.0/db.RowMutation.bin
index d6abf2d..73d93e8 100644
Binary files a/test/data/serialization/2.0/db.RowMutation.bin and b/test/data/serialization/2.0/db.RowMutation.bin differ


[02/10] git commit: add generation back to bucketing (filtering) sort

Posted by jb...@apache.org.
add generation back to bucketing (filtering) sort


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9a944940
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9a944940
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9a944940

Branch: refs/heads/trunk
Commit: 9a9449406ede4e9efcd14de0fbc5c0e43272cee5
Parents: 3728530
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Oct 29 15:36:10 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Oct 29 15:41:17 2013 -0500

----------------------------------------------------------------------
 .../db/compaction/SizeTieredCompactionStrategy.java      | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a944940/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 5115860..09d4e8e 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -90,7 +90,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
     @VisibleForTesting
     static List<SSTableReader> filterColdSSTables(List<SSTableReader> sstables, double coldReadsToOmit)
     {
-        // sort the sstables by hotness (coldest-first), breaking ties with size on disk (mainly for system tables and cold tables)
+        // sort the sstables by hotness (coldest-first)
         Collections.sort(sstables, new Comparator<SSTableReader>()
         {
             public int compare(SSTableReader o1, SSTableReader o2)
@@ -99,7 +99,14 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
                 if (comparison != 0)
                     return comparison;
 
-                return Long.compare(o1.bytesOnDisk(), o2.bytesOnDisk());
+                // break ties with size on disk (mainly for system tables and cold tables)
+                comparison = Long.compare(o1.bytesOnDisk(), o2.bytesOnDisk());
+                if (comparison != 0)
+                    return comparison;
+
+                // if there's still a tie, use generation, which is guaranteed to be unique.  this ensures that
+                // our filtering is deterministic, which can be useful when debugging.
+                return o1.descriptor.generation - o2.descriptor.generation;
             }
         });