You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/04/23 16:29:31 UTC

[3/3] git commit: Preemptive open of compaction results

Preemptive open of compaction results

Patch by benedict; reviewed by marcuse for CASSANDRA-6916


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4e95953f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4e95953f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4e95953f

Branch: refs/heads/cassandra-2.1
Commit: 4e95953f29d89a441dfe06d3f0393ed7dd8586df
Parents: b3a225e
Author: belliottsmith <gi...@sub.laerad.com>
Authored: Wed Apr 23 15:16:09 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Apr 23 16:25:02 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |  15 +-
 .../apache/cassandra/cache/AutoSavingCache.java |   2 +-
 .../cassandra/cache/RefCountedMemory.java       |  15 +-
 .../org/apache/cassandra/config/CFMetaData.java |  78 +++--
 .../org/apache/cassandra/config/Config.java     |   4 +-
 .../cassandra/config/DatabaseDescriptor.java    |  15 +-
 .../cassandra/cql/AlterTableStatement.java      |  18 +-
 .../cql/CreateColumnFamilyStatement.java        |   3 +-
 .../cassandra/cql3/statements/CFPropDefs.java   |   7 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  13 +-
 .../org/apache/cassandra/db/DataTracker.java    |  15 +-
 .../org/apache/cassandra/db/Directories.java    |   2 +-
 .../compaction/AbstractCompactionStrategy.java  |   2 +-
 .../db/compaction/AbstractCompactionTask.java   |   4 +-
 .../db/compaction/CompactionController.java     |   2 +-
 .../db/compaction/CompactionManager.java        |  91 +++--
 .../cassandra/db/compaction/CompactionTask.java | 143 +++-----
 .../db/compaction/LeveledCompactionTask.java    |   2 +-
 .../db/compaction/SSTableSplitter.java          |   7 +-
 .../cassandra/db/compaction/Scrubber.java       |  57 ++--
 .../SizeTieredCompactionStrategy.java           |   6 +-
 .../cassandra/db/compaction/Upgrader.java       |  43 +--
 .../io/compress/CompressedSequentialWriter.java |  26 +-
 .../io/compress/CompressionMetadata.java        | 181 +++++-----
 .../io/sstable/AbstractSSTableSimpleWriter.java |   2 +-
 .../apache/cassandra/io/sstable/Descriptor.java |  49 ++-
 .../cassandra/io/sstable/IndexSummary.java      |  16 +-
 .../io/sstable/IndexSummaryBuilder.java         |  75 ++++-
 .../apache/cassandra/io/sstable/SSTable.java    |   2 -
 .../cassandra/io/sstable/SSTableLoader.java     |   2 +-
 .../cassandra/io/sstable/SSTableReader.java     | 119 +++++--
 .../cassandra/io/sstable/SSTableRewriter.java   | 330 +++++++++++++++++++
 .../cassandra/io/sstable/SSTableWriter.java     | 117 ++++++-
 .../io/sstable/metadata/MetadataCollector.java  |  18 +-
 .../io/sstable/metadata/MetadataSerializer.java |   2 +-
 .../io/util/BufferedPoolingSegmentedFile.java   |   5 +
 .../io/util/BufferedSegmentedFile.java          |   5 +
 .../io/util/ChecksummedSequentialWriter.java    |   6 +-
 .../io/util/CompressedPoolingSegmentedFile.java |  15 +-
 .../io/util/CompressedSegmentedFile.java        |  24 +-
 .../org/apache/cassandra/io/util/FileUtils.java |   5 +
 .../org/apache/cassandra/io/util/Memory.java    |  32 +-
 .../cassandra/io/util/MmappedSegmentedFile.java |  24 +-
 .../cassandra/io/util/PoolingSegmentedFile.java |   7 +-
 .../apache/cassandra/io/util/SegmentedFile.java |  14 +-
 .../cassandra/io/util/SequentialWriter.java     |  52 +--
 .../cassandra/service/FileCacheService.java     |  94 ++++--
 .../cassandra/streaming/StreamLockfile.java     |  13 +-
 .../cassandra/tools/StandaloneScrubber.java     |  11 -
 .../cassandra/tools/StandaloneSplitter.java     |   4 -
 .../cassandra/tools/StandaloneUpgrader.java     |   3 -
 .../org/apache/cassandra/utils/CLibrary.java    |  59 ++--
 .../cassandra/utils/obs/OffHeapBitSet.java      |   5 +-
 .../db/compaction/LongCompactionsTest.java      |   2 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     |  64 +++-
 .../apache/cassandra/db/DirectoriesTest.java    |  23 +-
 .../org/apache/cassandra/db/KeyCacheTest.java   |  21 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |  23 +-
 .../CompressedRandomAccessReaderTest.java       |  19 +-
 .../cassandra/io/sstable/LegacySSTableTest.java |   2 +-
 .../cassandra/io/sstable/SSTableUtils.java      |   2 +-
 .../metadata/MetadataSerializerTest.java        |   5 +-
 .../cassandra/io/util/DataOutputTest.java       |   2 +-
 .../compress/CompressedInputStreamTest.java     |   2 +-
 65 files changed, 1345 insertions(+), 682 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 211e55c..d32a107 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -51,6 +51,7 @@
  * Require nodetool rebuild_index to specify index names (CASSANDRA-7038)
  * fix cassandra stress errors on reads with native protocol (CASSANDRA-7033)
  * Use OpOrder to guard sstable references for reads (CASSANDRA-6919)
+ * Preemptive opening of compaction result (CASSANDRA-6916)
 Merged from 2.0:
  * Use LOCAL_QUORUM for data reads at LOCAL_SERIAL (CASSANDRA-6939)
  * Log a warning for large batches (CASSANDRA-6487)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 8ed796b..2176bf9 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -511,10 +511,11 @@ in_memory_compaction_limit_in_mb: 64
 # of compaction, including validation compaction.
 compaction_throughput_mb_per_sec: 16
 
-# Track cached row keys during compaction, and re-cache their new
-# positions in the compacted sstable.  Disable if you use really large
-# key caches.
-compaction_preheat_key_cache: true
+# When compacting, the replacement sstable(s) can be opened before they
+# are completely written, and used in place of the prior sstables for
+# any range that has been written. This helps to smoothly transfer reads 
+# between the sstables, reducing page cache churn and keeping hot rows hot
+sstable_preemptive_open_interval_in_mb: 50
 
 # Throttles all outbound streaming file transfers on this node to the
 # given total throughput in Mbps. This is necessary because Cassandra does
@@ -730,9 +731,3 @@ internode_compression: all
 # reducing overhead from the TCP protocol itself, at the cost of increasing
 # latency if you block for cross-datacenter responses.
 inter_dc_tcp_nodelay: false
-
-# Enable or disable kernel page cache preheating from contents of the key cache after compaction.
-# When enabled it would preheat only first "page" (4KB) of each row to optimize
-# for sequential access. Note: This could be harmful for fat rows, see CASSANDRA-4937
-# for further details on that topic.
-preheat_kernel_page_cache: false

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 7b9ae95..db79a15 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -258,7 +258,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
         {
             File path = getCachePath(pathInfo.keyspace, pathInfo.columnFamily, pathInfo.cfId, CURRENT_VERSION);
             File tmpFile = FileUtils.createTempFile(path.getName(), null, path.getParentFile());
-            return SequentialWriter.open(tmpFile, true);
+            return SequentialWriter.open(tmpFile);
         }
 
         private void deleteOldCacheFiles()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/cache/RefCountedMemory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/RefCountedMemory.java b/src/java/org/apache/cassandra/cache/RefCountedMemory.java
index 76d9b00..e5c543e 100644
--- a/src/java/org/apache/cassandra/cache/RefCountedMemory.java
+++ b/src/java/org/apache/cassandra/cache/RefCountedMemory.java
@@ -51,6 +51,19 @@ public class RefCountedMemory extends Memory
     public void unreference()
     {
         if (UPDATER.decrementAndGet(this) == 0)
-            free();
+            super.free();
     }
+
+    public RefCountedMemory copy(long newSize)
+    {
+        RefCountedMemory copy = new RefCountedMemory(newSize);
+        copy.put(0, this, 0, Math.min(size(), newSize));
+        return copy;
+    }
+
+    public void free()
+    {
+        throw new AssertionError();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 72a0fc5..97fc241 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -21,7 +21,20 @@ import java.io.DataInput;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
@@ -37,16 +50,44 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cache.CachingOptions;
-import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.statements.CFStatement;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.AtomDeserializer;
+import org.apache.cassandra.db.CFRowAdder;
+import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.db.ColumnSerializer;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.OnDiskAtom;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.SuperColumns;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
 import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
 import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
-import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.composites.CType;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.CellNames;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.CompoundCType;
+import org.apache.cassandra.db.composites.SimpleCType;
 import org.apache.cassandra.db.index.SecondaryIndex;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.RequestValidationException;
@@ -55,14 +96,16 @@ import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.compress.LZ4Compressor;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.thrift.CqlRow;
 import org.apache.cassandra.thrift.CqlResult;
+import org.apache.cassandra.thrift.CqlRow;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
-import static org.apache.cassandra.utils.FBUtilities.*;
+import static org.apache.cassandra.utils.FBUtilities.fromJsonList;
+import static org.apache.cassandra.utils.FBUtilities.fromJsonMap;
+import static org.apache.cassandra.utils.FBUtilities.json;
 
 /**
  * This class can be tricky to modify. Please read http://wiki.apache.org/cassandra/ConfigurationNotes for how to do so safely.
@@ -82,7 +125,6 @@ public final class CFMetaData
     public final static SpeculativeRetry DEFAULT_SPECULATIVE_RETRY = new SpeculativeRetry(SpeculativeRetry.RetryType.PERCENTILE, 0.99);
     public final static int DEFAULT_MIN_INDEX_INTERVAL = 128;
     public final static int DEFAULT_MAX_INDEX_INTERVAL = 2048;
-    public final static boolean DEFAULT_POPULATE_IO_CACHE_ON_FLUSH = false;
 
     // Note that this is the default only for user created tables
     public final static String DEFAULT_COMPRESSOR = LZ4Compressor.class.getCanonicalName();
@@ -397,7 +439,6 @@ public final class CFMetaData
     private volatile int memtableFlushPeriod = 0;
     private volatile int defaultTimeToLive = DEFAULT_DEFAULT_TIME_TO_LIVE;
     private volatile SpeculativeRetry speculativeRetry = DEFAULT_SPECULATIVE_RETRY;
-    private volatile boolean populateIoCacheOnFlush = DEFAULT_POPULATE_IO_CACHE_ON_FLUSH;
     private volatile Map<ColumnIdentifier, Long> droppedColumns = new HashMap<>();
     private volatile Map<String, TriggerDefinition> triggers = new HashMap<>();
     private volatile boolean isPurged = false;
@@ -443,7 +484,6 @@ public final class CFMetaData
     public CFMetaData memtableFlushPeriod(int prop) {memtableFlushPeriod = prop; return this;}
     public CFMetaData defaultTimeToLive(int prop) {defaultTimeToLive = prop; return this;}
     public CFMetaData speculativeRetry(SpeculativeRetry prop) {speculativeRetry = prop; return this;}
-    public CFMetaData populateIoCacheOnFlush(boolean prop) {populateIoCacheOnFlush = prop; return this;}
     public CFMetaData droppedColumns(Map<ColumnIdentifier, Long> cols) {droppedColumns = cols; return this;}
     public CFMetaData triggers(Map<String, TriggerDefinition> prop) {triggers = prop; return this;}
 
@@ -621,7 +661,6 @@ public final class CFMetaData
                       .maxIndexInterval(oldCFMD.maxIndexInterval)
                       .speculativeRetry(oldCFMD.speculativeRetry)
                       .memtableFlushPeriod(oldCFMD.memtableFlushPeriod)
-                      .populateIoCacheOnFlush(oldCFMD.populateIoCacheOnFlush)
                       .droppedColumns(new HashMap<>(oldCFMD.droppedColumns))
                       .triggers(new HashMap<>(oldCFMD.triggers))
                       .rebuild();
@@ -673,11 +712,6 @@ public final class CFMetaData
         return ReadRepairDecision.NONE;
     }
 
-    public boolean populateIoCacheOnFlush()
-    {
-        return populateIoCacheOnFlush;
-    }
-
     public int getGcGraceSeconds()
     {
         return gcGraceSeconds;
@@ -880,7 +914,6 @@ public final class CFMetaData
             && Objects.equal(minIndexInterval, other.minIndexInterval)
             && Objects.equal(maxIndexInterval, other.maxIndexInterval)
             && Objects.equal(speculativeRetry, other.speculativeRetry)
-            && Objects.equal(populateIoCacheOnFlush, other.populateIoCacheOnFlush)
             && Objects.equal(droppedColumns, other.droppedColumns)
             && Objects.equal(triggers, other.triggers);
     }
@@ -913,7 +946,6 @@ public final class CFMetaData
             .append(minIndexInterval)
             .append(maxIndexInterval)
             .append(speculativeRetry)
-            .append(populateIoCacheOnFlush)
             .append(droppedColumns)
             .append(triggers)
             .toHashCode();
@@ -930,8 +962,6 @@ public final class CFMetaData
     {
         if (!cf_def.isSetComment())
             cf_def.setComment("");
-        if (!cf_def.isSetPopulate_io_cache_on_flush())
-            cf_def.setPopulate_io_cache_on_flush(CFMetaData.DEFAULT_POPULATE_IO_CACHE_ON_FLUSH);
         if (!cf_def.isSetMin_compaction_threshold())
             cf_def.setMin_compaction_threshold(CFMetaData.DEFAULT_MIN_COMPACTION_THRESHOLD);
         if (!cf_def.isSetMax_compaction_threshold())
@@ -1023,7 +1053,6 @@ public final class CFMetaData
             if (cf_def.isSetSpeculative_retry())
                 newCFMD.speculativeRetry(SpeculativeRetry.fromString(cf_def.speculative_retry));
             if (cf_def.isSetPopulate_io_cache_on_flush())
-                newCFMD.populateIoCacheOnFlush(cf_def.populate_io_cache_on_flush);
             if (cf_def.isSetTriggers())
                 newCFMD.triggers(TriggerDefinition.fromThrift(cf_def.triggers));
 
@@ -1125,7 +1154,6 @@ public final class CFMetaData
         memtableFlushPeriod = cfm.memtableFlushPeriod;
         defaultTimeToLive = cfm.defaultTimeToLive;
         speculativeRetry = cfm.speculativeRetry;
-        populateIoCacheOnFlush = cfm.populateIoCacheOnFlush;
 
         if (!cfm.droppedColumns.isEmpty())
             droppedColumns = cfm.droppedColumns;
@@ -1252,7 +1280,6 @@ public final class CFMetaData
         def.setComment(Strings.nullToEmpty(comment));
         def.setRead_repair_chance(readRepairChance);
         def.setDclocal_read_repair_chance(dcLocalReadRepairChance);
-        def.setPopulate_io_cache_on_flush(populateIoCacheOnFlush);
         def.setGc_grace_seconds(gcGraceSeconds);
         def.setDefault_validation_class(defaultValidator == null ? null : defaultValidator.toString());
         def.setKey_validation_class(keyValidator.toString());
@@ -1642,7 +1669,6 @@ public final class CFMetaData
         adder.add("comment", comment);
         adder.add("read_repair_chance", readRepairChance);
         adder.add("local_read_repair_chance", dcLocalReadRepairChance);
-        adder.add("populate_io_cache_on_flush", populateIoCacheOnFlush);
         adder.add("gc_grace_seconds", gcGraceSeconds);
         adder.add("default_validator", defaultValidator.toString());
         adder.add("key_validator", keyValidator.toString());
@@ -1730,9 +1756,6 @@ public final class CFMetaData
             if (result.has("max_index_interval"))
                 cfm.maxIndexInterval(result.getInt("max_index_interval"));
 
-            if (result.has("populate_io_cache_on_flush"))
-                cfm.populateIoCacheOnFlush(result.getBoolean("populate_io_cache_on_flush"));
-
             /*
              * The info previously hold by key_aliases, column_aliases and value_alias is now stored in columnMetadata (because 1) this
              * make more sense and 2) this allow to store indexing information).
@@ -2198,7 +2221,6 @@ public final class CFMetaData
             .append("minIndexInterval", minIndexInterval)
             .append("maxIndexInterval", maxIndexInterval)
             .append("speculativeRetry", speculativeRetry)
-            .append("populateIoCacheOnFlush", populateIoCacheOnFlush)
             .append("droppedColumns", droppedColumns)
             .append("triggers", triggers)
             .toString();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 3cf8ff8..7ab7a8c 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -177,7 +177,7 @@ public class Config
     public int hinted_handoff_throttle_in_kb = 1024;
     public int batchlog_replay_throttle_in_kb = 1024;
     public int max_hints_delivery_threads = 1;
-    public boolean compaction_preheat_key_cache = true;
+    public int sstable_preemptive_open_interval_in_mb = 50;
 
     public volatile boolean incremental_backups = false;
     public boolean trickle_fsync = false;
@@ -199,8 +199,6 @@ public class Config
 
     private static boolean isClientMode = false;
 
-    public boolean preheat_kernel_page_cache = false;
-
     public Integer file_cache_size_in_mb;
 
     public boolean inter_dc_tcp_nodelay = true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index ef2c4fc..4b0043c 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1331,11 +1331,6 @@ public class DatabaseDescriptor
         return conf.max_hints_delivery_threads;
     }
 
-    public static boolean getPreheatKeyCache()
-    {
-        return conf.compaction_preheat_key_cache;
-    }
-
     public static boolean isIncrementalBackupsEnabled()
     {
         return conf.incremental_backups;
@@ -1356,6 +1351,11 @@ public class DatabaseDescriptor
         return conf.commitlog_total_space_in_mb;
     }
 
+    public static int getSSTablePreempiveOpenIntervalInMB()
+    {
+        return conf.sstable_preemptive_open_interval_in_mb;
+    }
+
     public static boolean getTrickleFsync()
     {
         return conf.trickle_fsync;
@@ -1476,11 +1476,6 @@ public class DatabaseDescriptor
         return conf.inter_dc_tcp_nodelay;
     }
 
-    public static boolean shouldPreheatPageCache()
-    {
-        return conf.preheat_kernel_page_cache;
-    }
-
     public static Pool getMemtableAllocatorPool()
     {
         long heapLimit = ((long) conf.memtable_heap_space_in_mb) << 20;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/cql/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/AlterTableStatement.java b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
index 7af65a1..5bc7011 100644
--- a/src/java/org/apache/cassandra/cql/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
@@ -17,16 +17,21 @@
  */
 package org.apache.cassandra.cql;
 
-import org.apache.cassandra.cache.CachingOptions;
-import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.marshal.TypeParser;
-import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.io.compress.CompressionParameters;
-
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.cassandra.cache.CachingOptions;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.io.compress.CompressionParameters;
+
 public class AlterTableStatement
 {
     public static enum OperationType
@@ -183,7 +188,6 @@ public class AlterTableStatement
         cfm.caching(CachingOptions.fromString(cfProps.getPropertyString(CFPropDefs.KW_CACHING, cfm.getCaching().toString())));
         cfm.defaultTimeToLive(cfProps.getPropertyInt(CFPropDefs.KW_DEFAULT_TIME_TO_LIVE, cfm.getDefaultTimeToLive()));
         cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(cfProps.getPropertyString(CFPropDefs.KW_SPECULATIVE_RETRY, cfm.getSpeculativeRetry().toString())));
-        cfm.populateIoCacheOnFlush(cfProps.getPropertyBoolean(CFPropDefs.KW_POPULATE_IO_CACHE_ON_FLUSH, cfm.populateIoCacheOnFlush()));
         cfm.bloomFilterFpChance(cfProps.getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, cfm.getBloomFilterFpChance()));
         cfm.memtableFlushPeriod(cfProps.getPropertyInt(CFPropDefs.KW_MEMTABLE_FLUSH_PERIOD, cfm.getMemtableFlushPeriod()));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
index b483451..4cb9eba 100644
--- a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
+++ b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
@@ -201,8 +201,7 @@ public class CreateColumnFamilyStatement
                    .speculativeRetry(CFMetaData.SpeculativeRetry.fromString(getPropertyString(CFPropDefs.KW_SPECULATIVE_RETRY, CFMetaData.DEFAULT_SPECULATIVE_RETRY.toString())))
                    .bloomFilterFpChance(getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, null))
                    .memtableFlushPeriod(getPropertyInt(CFPropDefs.KW_MEMTABLE_FLUSH_PERIOD, 0))
-                   .defaultTimeToLive(getPropertyInt(CFPropDefs.KW_DEFAULT_TIME_TO_LIVE, CFMetaData.DEFAULT_DEFAULT_TIME_TO_LIVE))
-                   .populateIoCacheOnFlush(getPropertyBoolean(CFPropDefs.KW_POPULATE_IO_CACHE_ON_FLUSH, CFMetaData.DEFAULT_POPULATE_IO_CACHE_ON_FLUSH));
+                   .defaultTimeToLive(getPropertyInt(CFPropDefs.KW_DEFAULT_TIME_TO_LIVE, CFMetaData.DEFAULT_DEFAULT_TIME_TO_LIVE));
 
             // CQL2 can have null keyAliases
             if (keyAlias != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
index 95fb750..8df0106 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
@@ -17,7 +17,10 @@
  */
 package org.apache.cassandra.cql3.statements;
 
-import java.util.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.cassandra.cache.CachingOptions;
 import org.apache.cassandra.config.CFMetaData;
@@ -26,7 +29,6 @@ import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.io.compress.CompressionParameters;
-import org.apache.cassandra.service.CacheService;
 
 public class CFPropDefs extends PropertyDefinitions
 {
@@ -183,7 +185,6 @@ public class CFPropDefs extends PropertyDefinitions
         cfm.defaultTimeToLive(getInt(KW_DEFAULT_TIME_TO_LIVE, cfm.getDefaultTimeToLive()));
         cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(getString(KW_SPECULATIVE_RETRY, cfm.getSpeculativeRetry().toString())));
         cfm.memtableFlushPeriod(getInt(KW_MEMTABLE_FLUSH_PERIOD, cfm.getMemtableFlushPeriod()));
-        cfm.populateIoCacheOnFlush(getBoolean(KW_POPULATE_IO_CACHE_ON_FLUSH, cfm.populateIoCacheOnFlush()));
         cfm.minIndexInterval(getInt(KW_MIN_INDEX_INTERVAL, cfm.getMinIndexInterval()));
         cfm.maxIndexInterval(getInt(KW_MAX_INDEX_INTERVAL, cfm.getMaxIndexInterval()));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index ea49250..49cb47d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -487,7 +487,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             Descriptor desc = sstableFiles.getKey();
             Set<Component> components = sstableFiles.getValue();
 
-            if (desc.temporary)
+            if (desc.type.isTemporary)
             {
                 SSTable.delete(desc, components);
                 continue;
@@ -680,7 +680,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
             if (currentDescriptors.contains(descriptor))
                 continue; // old (initialized) SSTable found, skipping
-            if (descriptor.temporary) // in the process of being written
+            if (descriptor.type.isTemporary) // in the process of being written
                 continue;
 
             if (!descriptor.isCompatible())
@@ -710,7 +710,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                                                descriptor.ksname,
                                                descriptor.cfname,
                                                fileIndexGenerator.incrementAndGet(),
-                                               false);
+                                               Descriptor.Type.FINAL);
             }
             while (new File(newDescriptor.filenameFor(Component.DATA)).exists());
 
@@ -789,7 +789,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                                          keyspace.getName(),
                                          name,
                                          fileIndexGenerator.incrementAndGet(),
-                                         true);
+                                         Descriptor.Type.TEMP);
         return desc.filenameFor(Component.DATA);
     }
 
@@ -1362,11 +1362,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         data.markObsolete(sstables, compactionType);
     }
 
-    public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Collection<SSTableReader> replacements, OperationType compactionType)
-    {
-        data.replaceCompactedSSTables(sstables, replacements, compactionType);
-    }
-
     void replaceFlushed(Memtable memtable, SSTableReader sstable)
     {
         compactionStrategy.replaceFlushed(memtable, sstable);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 9c8f9a0..e574143 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -195,11 +195,12 @@ public class DataTracker
         while (true)
         {
             View currentView = view.get();
-            Set<SSTableReader> inactive = Sets.difference(ImmutableSet.copyOf(sstables), currentView.compacting);
-            if (inactive.size() < Iterables.size(sstables))
+            Set<SSTableReader> set = ImmutableSet.copyOf(sstables);
+            Set<SSTableReader> inactive = Sets.difference(set, currentView.compacting);
+            if (inactive.size() < set.size())
                 return false;
 
-            View newView = currentView.markCompacting(inactive);
+            View newView = currentView.markCompacting(set);
             if (view.compareAndSet(currentView, newView))
                 return true;
         }
@@ -245,10 +246,12 @@ public class DataTracker
         notifySSTablesChanged(sstables, Collections.<SSTableReader>emptyList(), compactionType);
     }
 
-    public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Collection<SSTableReader> replacements, OperationType compactionType)
+    // note that this DOES NOT insert the replacement sstables, it only removes the old sstables and notifies any listeners
+    // that they have been replaced by the provided sstables, which must have been performed by an earlier replaceReaders() call
+    public void markCompactedSSTablesReplaced(Collection<SSTableReader> sstables, Collection<SSTableReader> allReplacements, OperationType compactionType)
     {
-        replace(sstables, replacements);
-        notifySSTablesChanged(sstables, replacements, compactionType);
+        replace(sstables, Collections.<SSTableReader>emptyList());
+        notifySSTablesChanged(sstables, allReplacements, compactionType);
     }
 
     public void addInitialSSTables(Collection<SSTableReader> sstables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index a6b7efa..1350be2 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -501,7 +501,7 @@ public class Directories
                     if (pair == null)
                         return false;
 
-                    if (skipTemporary && pair.left.temporary)
+                    if (skipTemporary && pair.left.type.isTemporary)
                         return false;
 
                     Set<Component> previous = components.get(pair.left);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index b7b3782..0691819 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -169,7 +169,7 @@ public abstract class AbstractCompactionStrategy
 
     public AbstractCompactionTask getCompactionTask(Collection<SSTableReader> sstables, final int gcBefore, long maxSSTableBytes)
     {
-        return new CompactionTask(cfs, sstables, gcBefore);
+        return new CompactionTask(cfs, sstables, gcBefore, false);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
index f1fcbd1..5f9c7ae 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
@@ -28,7 +28,7 @@ import org.apache.cassandra.io.util.DiskAwareRunnable;
 public abstract class AbstractCompactionTask extends DiskAwareRunnable
 {
     protected final ColumnFamilyStore cfs;
-    protected Iterable<SSTableReader> sstables;
+    protected Set<SSTableReader> sstables;
     protected boolean isUserDefined;
     protected OperationType compactionType;
 
@@ -36,7 +36,7 @@ public abstract class AbstractCompactionTask extends DiskAwareRunnable
      * @param cfs
      * @param sstables must be marked compacting
      */
-    public AbstractCompactionTask(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables)
+    public AbstractCompactionTask(ColumnFamilyStore cfs, Set<SSTableReader> sstables)
     {
         this.cfs = cfs;
         this.sstables = sstables;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 1fc1eda..3fe5c26 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -127,7 +127,7 @@ public class CompactionController implements AutoCloseable
                         candidate, candidate.getSSTableMetadata().maxLocalDeletionTime, gcBefore);
             }
         }
-        return new HashSet<SSTableReader>(candidates);
+        return new HashSet<>(candidates);
     }
 
     public String getKeyspace()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index b1f0c2a..8343e86 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -45,6 +45,7 @@ import javax.management.openmbean.TabularData;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
@@ -74,6 +75,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableRewriter;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.FileUtils;
@@ -424,12 +426,8 @@ public class CompactionManager implements CompactionManagerMBean
         }
         cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
         cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses));
-        Collection<SSTableReader> antiCompactedSSTables = null;
         if (!sstables.isEmpty())
-            antiCompactedSSTables = doAntiCompaction(cfs, ranges, sstables, repairedAt);
-        // verify that there are tables to be swapped, otherwise CFS#replaceCompactedSSTables will hang.
-        if (antiCompactedSSTables != null && antiCompactedSSTables.size() > 0)
-            cfs.replaceCompactedSSTables(sstables, antiCompactedSSTables, OperationType.ANTICOMPACTION);
+            doAntiCompaction(cfs, ranges, sstables, repairedAt);
         SSTableReader.releaseReferences(sstables);
         cfs.getDataTracker().unmarkCompacting(sstables);
         logger.info(String.format("Completed anticompaction successfully"));
@@ -585,7 +583,7 @@ public class CompactionManager implements CompactionManagerMBean
 
     private void scrubOne(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted) throws IOException
     {
-        Scrubber scrubber = new Scrubber(cfs, sstable, skipCorrupted);
+        Scrubber scrubber = new Scrubber(cfs, sstable, skipCorrupted, false);
 
         CompactionInfo.Holder scrubInfo = scrubber.getScrubInfo();
         metrics.beginCompaction(scrubInfo);
@@ -598,14 +596,6 @@ public class CompactionManager implements CompactionManagerMBean
             scrubber.close();
             metrics.finishCompaction(scrubInfo);
         }
-
-        if (scrubber.getNewInOrderSSTable() != null)
-            cfs.addSSTable(scrubber.getNewInOrderSSTable());
-
-        if (scrubber.getNewSSTable() == null)
-            cfs.markObsolete(Collections.singletonList(sstable), OperationType.SCRUB);
-        else
-            cfs.replaceCompactedSSTables(Collections.singletonList(sstable), Collections.singletonList(scrubber.getNewSSTable()), OperationType.SCRUB);
     }
 
     /**
@@ -683,9 +673,10 @@ public class CompactionManager implements CompactionManagerMBean
 
         for (SSTableReader sstable : sstables)
         {
-            if (!hasIndexes && !new Bounds<Token>(sstable.first.token, sstable.last.token).intersects(ranges))
+            if (!hasIndexes && !new Bounds<>(sstable.first.token, sstable.last.token).intersects(ranges))
             {
-                cfs.replaceCompactedSSTables(Arrays.asList(sstable), Collections.<SSTableReader>emptyList(), OperationType.CLEANUP);
+                cfs.getDataTracker().replaceReaders(Arrays.asList(sstable), Collections.<SSTableReader>emptyList());
+                cfs.getDataTracker().markCompactedSSTablesReplaced(Arrays.asList(sstable), Collections.<SSTableReader>emptyList(), OperationType.CLEANUP);
                 continue;
             }
             if (!needsCleanup(sstable, ranges))
@@ -714,20 +705,18 @@ public class CompactionManager implements CompactionManagerMBean
             CleanupInfo ci = new CleanupInfo(sstable, scanner);
 
             metrics.beginCompaction(ci);
-            SSTableWriter writer = createWriter(cfs,
-                                                compactionFileLocation,
-                                                expectedBloomFilterSize,
-                                                sstable.getSSTableMetadata().repairedAt,
-                                                sstable);
-            SSTableReader newSstable = null;
+            SSTableRewriter writer = new SSTableRewriter(cfs, new HashSet<>(ImmutableSet.of(sstable)), sstable.maxDataAge, OperationType.CLEANUP, false);
+
             try
             {
+                writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
+
                 while (scanner.hasNext())
                 {
                     if (ci.isStopRequested())
                         throw new CompactionInterruptedException(ci.getCompactionInfo());
-                    SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
 
+                    SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
                     row = cleanupStrategy.cleanup(row);
                     if (row == null)
                         continue;
@@ -735,10 +724,11 @@ public class CompactionManager implements CompactionManagerMBean
                     if (writer.append(compactedRow) != null)
                         totalkeysWritten++;
                 }
-                if (totalkeysWritten > 0)
-                    newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
-                else
-                    writer.abort();
+
+                // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
+                cfs.indexManager.flushIndexesBlocking();
+
+                writer.finish();
             }
             catch (Throwable e)
             {
@@ -752,23 +742,18 @@ public class CompactionManager implements CompactionManagerMBean
                 metrics.finishCompaction(ci);
             }
 
-            List<SSTableReader> results = new ArrayList<SSTableReader>(1);
-            if (newSstable != null)
+            List<SSTableReader> results = writer.finished();
+            if (!results.isEmpty())
             {
-                results.add(newSstable);
-
                 String format = "Cleaned up to %s.  %,d to %,d (~%d%% of original) bytes for %,d keys.  Time: %,dms.";
                 long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
                 long startsize = sstable.onDiskLength();
-                long endsize = newSstable.onDiskLength();
+                long endsize = 0;
+                for (SSTableReader newSstable : results)
+                    endsize += newSstable.onDiskLength();
                 double ratio = (double) endsize / (double) startsize;
-                logger.info(String.format(format, writer.getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
+                logger.info(String.format(format, results.get(0).getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
             }
-
-            // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
-            cfs.indexManager.flushIndexesBlocking();
-
-            cfs.replaceCompactedSSTables(Arrays.asList(sstable), results, OperationType.CLEANUP);
         }
     }
 
@@ -989,14 +974,21 @@ public class CompactionManager implements CompactionManagerMBean
             }
 
             logger.info("Anticompacting {}", sstable);
+            Set<SSTableReader> sstableAsSet = new HashSet<>();
+            sstableAsSet.add(sstable);
+
             File destination = cfs.directories.getDirectoryForCompactedSSTables();
-            SSTableWriter repairedSSTableWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);
-            SSTableWriter unRepairedSSTableWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable);
+            SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
+            SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
+
+            AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
+            List<ICompactionScanner> scanners = strategy.getScanners(Arrays.asList(sstable));
 
             try (CompactionController controller = new CompactionController(cfs, new HashSet<>(Collections.singleton(sstable)), CFMetaData.DEFAULT_GC_GRACE_SECONDS))
             {
-                AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
-                List<ICompactionScanner> scanners = strategy.getScanners(Arrays.asList(sstable));
+                repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable));
+                unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable));
+
                 CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners, controller);
 
                 try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
@@ -1018,16 +1010,13 @@ public class CompactionManager implements CompactionManagerMBean
                         }
                     }
                 }
+                // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
+                // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
+                repairedSSTableWriter.finish(false, repairedAt);
+                unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE);
                 // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
-                if (repairedKeyCount > 0)
-                    anticompactedSSTables.add(repairedSSTableWriter.closeAndOpenReader(sstable.maxDataAge));
-                else
-                    repairedSSTableWriter.abort();
-                // supply null as we keep SSTableMetadata#repairedAt empty if the table isn't repaired
-                if (unrepairedKeyCount > 0)
-                    anticompactedSSTables.add(unRepairedSSTableWriter.closeAndOpenReader(sstable.maxDataAge));
-                else
-                    unRepairedSSTableWriter.abort();
+                anticompactedSSTables.addAll(repairedSSTableWriter.finished());
+                anticompactedSSTables.addAll(unRepairedSSTableWriter.finished());
             }
             catch (Throwable e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index f94ef93..77dc7b0 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -19,23 +19,30 @@ package org.apache.cassandra.db.compaction;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.*;
-import com.google.common.collect.*;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
-import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableRewriter;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -45,15 +52,15 @@ public class CompactionTask extends AbstractCompactionTask
 {
     protected static final Logger logger = LoggerFactory.getLogger(CompactionTask.class);
     protected final int gcBefore;
+    private final boolean offline;
     protected static long totalBytesCompacted = 0;
-    private Set<SSTableReader> toCompact;
     private CompactionExecutorStatsCollector collector;
 
-    public CompactionTask(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables, final int gcBefore)
+    public CompactionTask(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables, int gcBefore, boolean offline)
     {
-        super(cfs, sstables);
+        super(cfs, Sets.newHashSet(sstables));
         this.gcBefore = gcBefore;
-        toCompact = Sets.newHashSet(sstables);
+        this.offline = offline;
     }
 
     public static synchronized long addToTotalBytesCompacted(long bytesCompacted)
@@ -65,23 +72,23 @@ public class CompactionTask extends AbstractCompactionTask
     {
         this.collector = collector;
         run();
-        return toCompact.size();
+        return sstables.size();
     }
 
     public long getExpectedWriteSize()
     {
-        return cfs.getExpectedCompactedFileSize(toCompact, compactionType);
+        return cfs.getExpectedCompactedFileSize(sstables, compactionType);
     }
 
     public boolean reduceScopeForLimitedSpace()
     {
-        if (partialCompactionsAcceptable() && toCompact.size() > 1)
+        if (partialCompactionsAcceptable() && sstables.size() > 1)
         {
             // Try again w/o the largest one.
-            logger.warn("insufficient space to compact all requested files {}", StringUtils.join(toCompact, ", "));
+            logger.warn("insufficient space to compact all requested files {}", StringUtils.join(sstables, ", "));
             // Note that we have removed files that are still marked as compacting.
             // This suboptimal but ok since the caller will unmark all the sstables at the end.
-            return toCompact.remove(cfs.getMaxSizeFile(toCompact));
+            return sstables.remove(cfs.getMaxSizeFile(sstables));
         }
         else
         {
@@ -100,7 +107,7 @@ public class CompactionTask extends AbstractCompactionTask
         // it is not empty, it may compact down to nothing if all rows are deleted.
         assert sstables != null && sstableDirectory != null;
 
-        if (toCompact.size() == 0)
+        if (sstables.size() == 0)
             return;
 
         // Note that the current compaction strategy, is not necessarily the one this task was created under.
@@ -111,7 +118,7 @@ public class CompactionTask extends AbstractCompactionTask
             cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-compact-" + cfs.name);
 
         // sanity check: all sstables must belong to the same cfs
-        assert !Iterables.any(toCompact, new Predicate<SSTableReader>()
+        assert !Iterables.any(sstables, new Predicate<SSTableReader>()
         {
             @Override
             public boolean apply(SSTableReader sstable)
@@ -120,15 +127,15 @@ public class CompactionTask extends AbstractCompactionTask
             }
         });
 
-        UUID taskId = SystemKeyspace.startCompaction(cfs, toCompact);
+        UUID taskId = SystemKeyspace.startCompaction(cfs, sstables);
 
-        CompactionController controller = getCompactionController(toCompact);
-        Set<SSTableReader> actuallyCompact = Sets.difference(toCompact, controller.getFullyExpiredSSTables());
+        CompactionController controller = getCompactionController(sstables);
+        Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
 
         // new sstables from flush can be added during a compaction, but only the compaction can remove them,
         // so in our single-threaded compaction world this is a valid way of determining if we're compacting
         // all the sstables (that existed when we started)
-        logger.info("Compacting {}", toCompact);
+        logger.info("Compacting {}", sstables);
 
         long start = System.nanoTime();
         long totalKeysWritten = 0;
@@ -137,19 +144,18 @@ public class CompactionTask extends AbstractCompactionTask
         long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
         logger.debug("Expected bloom filter size : {}", keysPerSSTable);
 
+        // TODO: errors when creating the scanners can result in untidied resources
         AbstractCompactionIterable ci = new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller);
         CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
-        Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
 
         // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
         // replace the old entries.  Track entries to preheat here until then.
-        Map<Descriptor, Map<DecoratedKey, RowIndexEntry>> cachedKeyMap =  new HashMap<>();
-
-        Collection<SSTableReader> sstables = new ArrayList<>();
-        Collection<SSTableWriter> writers = new ArrayList<>();
         long minRepairedAt = getMinRepairedAt(actuallyCompact);
+        // we only need the age of the data that we're actually retaining
+        long maxAge = getMaxDataAge(actuallyCompact);
         if (collector != null)
             collector.beginCompaction(ci);
+        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline);
         try
         {
             if (!iter.hasNext())
@@ -157,75 +163,34 @@ public class CompactionTask extends AbstractCompactionTask
                 // don't mark compacted in the finally block, since if there _is_ nondeleted data,
                 // we need to sync it (via closeAndOpen) first, so there is no period during which
                 // a crash could cause data loss.
-                cfs.markObsolete(toCompact, compactionType);
+                cfs.markObsolete(sstables, compactionType);
                 return;
             }
 
-            SSTableWriter writer = createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt);
-            writers.add(writer);
+            writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
             while (iter.hasNext())
             {
                 if (ci.isStopRequested())
                     throw new CompactionInterruptedException(ci.getCompactionInfo());
 
                 AbstractCompactedRow row = iter.next();
-                RowIndexEntry indexEntry = writer.append(row);
-                if (indexEntry == null)
-                {
-                    controller.invalidateCachedRow(row.key);
-                    row.close();
-                    continue;
-                }
-
-                totalKeysWritten++;
-
-                if (DatabaseDescriptor.getPreheatKeyCache())
+                if (writer.append(row) != null)
                 {
-                    for (SSTableReader sstable : actuallyCompact)
+                    totalKeysWritten++;
+                    if (newSSTableSegmentThresholdReached(writer.currentWriter()))
                     {
-                        if (sstable.getCachedPosition(row.key, false) != null)
-                        {
-                            cachedKeys.put(row.key, indexEntry);
-                            break;
-                        }
+                        writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
                     }
                 }
-
-                if (newSSTableSegmentThresholdReached(writer))
-                {
-                    // tmp = false because later we want to query it with descriptor from SSTableReader
-                    cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
-                    writer = createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt);
-                    writers.add(writer);
-                    cachedKeys = new HashMap<>();
-                }
             }
 
-            if (writer.getFilePointer() > 0)
-            {
-                cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
-            }
-            else
-            {
-                writer.abort();
-                writers.remove(writer);
-            }
-
-            long maxAge = getMaxDataAge(toCompact);
-            for (SSTableWriter completedWriter : writers)
-                sstables.add(completedWriter.closeAndOpenReader(maxAge));
+            // don't replace old sstables yet, as we need to mark the compaction finished in the system table
+            writer.finish(false);
         }
         catch (Throwable t)
         {
-            for (SSTableWriter writer : writers)
-                writer.abort();
-            // also remove already completed SSTables
-            for (SSTableReader sstable : sstables)
-            {
-                sstable.markObsolete();
-                sstable.releaseReference();
-            }
-            throw Throwables.propagate(t);
+            writer.abort();
+            throw t;
         }
         finally
         {
@@ -251,20 +216,19 @@ public class CompactionTask extends AbstractCompactionTask
             }
         }
 
-        replaceCompactedSSTables(toCompact, sstables);
-        // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
-        for (SSTableReader sstable : sstables)
-            sstable.preheat(cachedKeyMap.get(sstable.descriptor));
+        Collection<SSTableReader> oldSStables = this.sstables;
+        List<SSTableReader> newSStables = writer.finished();
+        cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
 
         // log a bunch of statistics about the result and save to system table compaction_history
         long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-        long startsize = SSTableReader.getTotalBytes(toCompact);
-        long endsize = SSTableReader.getTotalBytes(sstables);
+        long startsize = SSTableReader.getTotalBytes(oldSStables);
+        long endsize = SSTableReader.getTotalBytes(newSStables);
         double ratio = (double) endsize / (double) startsize;
 
-        StringBuilder builder = new StringBuilder();
-        for (SSTableReader reader : sstables)
-            builder.append(reader.descriptor.baseFilename()).append(",");
+        StringBuilder newSSTableNames = new StringBuilder();
+        for (SSTableReader reader : newSStables)
+            newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
 
         double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
         long totalSourceRows = 0;
@@ -285,7 +249,7 @@ public class CompactionTask extends AbstractCompactionTask
 
         SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
         logger.info(String.format("Compacted %d sstables to [%s].  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
-                                         toCompact.size(), builder.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
+                                  oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
         logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
         logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
     }
@@ -307,7 +271,7 @@ public class CompactionTask extends AbstractCompactionTask
                                  repairedAt,
                                  cfs.metadata,
                                  cfs.partitioner,
-                                 new MetadataCollector(toCompact, cfs.metadata.comparator, getLevel()));
+                                 new MetadataCollector(sstables, cfs.metadata.comparator, getLevel()));
     }
 
     protected int getLevel()
@@ -315,11 +279,6 @@ public class CompactionTask extends AbstractCompactionTask
         return 0;
     }
 
-    protected void replaceCompactedSSTables(Collection<SSTableReader> compacted, Collection<SSTableReader> replacements)
-    {
-        cfs.replaceCompactedSSTables(compacted, replacements, compactionType);
-    }
-
     protected CompactionController getCompactionController(Set<SSTableReader> toCompact)
     {
         return new CompactionController(cfs, toCompact, gcBefore);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
index f64f633..2731b6d 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
@@ -30,7 +30,7 @@ public class LeveledCompactionTask extends CompactionTask
 
     public LeveledCompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int level, final int gcBefore, long maxSSTableBytes)
     {
-        super(cfs, sstables, gcBefore);
+        super(cfs, sstables, gcBefore, false);
         this.level = level;
         this.maxSSTableBytes = maxSSTableBytes;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
index a14ab43..67705e0 100644
--- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@ -57,7 +57,7 @@ public class SSTableSplitter {
 
         public SplittingCompactionTask(ColumnFamilyStore cfs, SSTableReader sstable, int sstableSizeInMB)
         {
-            super(cfs, Collections.singletonList(sstable), CompactionManager.NO_GC);
+            super(cfs, Collections.singletonList(sstable), CompactionManager.NO_GC, true);
             this.sstableSizeInMB = sstableSizeInMB;
 
             if (sstableSizeInMB <= 0)
@@ -71,11 +71,6 @@ public class SSTableSplitter {
         }
 
         @Override
-        protected void replaceCompactedSSTables(Collection<SSTableReader> compacted, Collection<SSTableReader> replacements)
-        {
-        }
-
-        @Override
         protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer)
         {
             return writer.getOnDiskFilePointer() > sstableSizeInMB * 1024L * 1024L;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 01da2e1..d61f62b 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -33,10 +33,10 @@ import org.apache.cassandra.utils.OutputHandler;
 
 public class Scrubber implements Closeable
 {
-    public final ColumnFamilyStore cfs;
-    public final SSTableReader sstable;
-    public final File destination;
-    public final boolean skipCorrupted;
+    private final ColumnFamilyStore cfs;
+    private final SSTableReader sstable;
+    private final File destination;
+    private final boolean skipCorrupted;
 
     private final CompactionController controller;
     private final boolean isCommutative;
@@ -46,7 +46,8 @@ public class Scrubber implements Closeable
     private final RandomAccessReader indexFile;
     private final ScrubInfo scrubInfo;
 
-    private SSTableWriter writer;
+    private final boolean isOffline;
+
     private SSTableReader newSstable;
     private SSTableReader newInOrderSstable;
 
@@ -65,9 +66,9 @@ public class Scrubber implements Closeable
     };
     private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator);
 
-    public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted) throws IOException
+    public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean isOffline) throws IOException
     {
-        this(cfs, sstable, skipCorrupted, new OutputHandler.LogOutput(), false);
+        this(cfs, sstable, skipCorrupted, new OutputHandler.LogOutput(), isOffline);
     }
 
     public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, OutputHandler outputHandler, boolean isOffline) throws IOException
@@ -76,6 +77,7 @@ public class Scrubber implements Closeable
         this.sstable = sstable;
         this.outputHandler = outputHandler;
         this.skipCorrupted = skipCorrupted;
+        this.isOffline = isOffline;
 
         // Calculate the expected compacted filesize
         this.destination = cfs.directories.getDirectoryForCompactedSSTables();
@@ -104,6 +106,7 @@ public class Scrubber implements Closeable
     public void scrub()
     {
         outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
+        SSTableRewriter writer = new SSTableRewriter(cfs, new HashSet<>(Collections.singleton(sstable)), sstable.maxDataAge, OperationType.SCRUB, isOffline);
         try
         {
             ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
@@ -113,8 +116,7 @@ public class Scrubber implements Closeable
                 assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
             }
 
-            // TODO errors when creating the writer may leave empty temp files.
-            writer = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable);
+            writer.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
 
             DecoratedKey prevKey = null;
 
@@ -166,7 +168,6 @@ public class Scrubber implements Closeable
 
                 assert currentIndexKey != null || indexFile.isEOF();
 
-                writer.mark();
                 try
                 {
                     if (key == null)
@@ -182,7 +183,7 @@ public class Scrubber implements Closeable
                     }
 
                     AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
-                    if (writer.append(compactedRow) == null)
+                    if (writer.tryAppend(compactedRow) == null)
                         emptyRows++;
                     else
                         goodRows++;
@@ -194,7 +195,6 @@ public class Scrubber implements Closeable
                 {
                     throwIfFatal(th);
                     outputHandler.warn("Error reading row (stacktrace follows):", th);
-                    writer.resetAndTruncate();
 
                     if (currentIndexKey != null
                         && (key == null || !key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex))
@@ -212,7 +212,7 @@ public class Scrubber implements Closeable
                             }
 
                             AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
-                            if (writer.append(compactedRow) == null)
+                            if (writer.tryAppend(compactedRow) == null)
                                 emptyRows++;
                             else
                                 goodRows++;
@@ -224,7 +224,6 @@ public class Scrubber implements Closeable
                             throwIfCommutative(key, th2);
 
                             outputHandler.warn("Retry failed too. Skipping to next row (retry's stacktrace follows)", th2);
-                            writer.resetAndTruncate();
                             dataFile.seek(nextRowPositionFromIndex);
                             badRows++;
                         }
@@ -241,16 +240,27 @@ public class Scrubber implements Closeable
                 }
             }
 
-            if (writer.getFilePointer() > 0)
+            if (!outOfOrderRows.isEmpty())
             {
+                // out of order rows, but no bad rows found - we can keep our repairedAt time
                 long repairedAt = badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt;
-                newSstable = writer.closeAndOpenReader(sstable.maxDataAge, repairedAt);
+                SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);
+                for (Row row : outOfOrderRows)
+                    inOrderWriter.append(row.key, row.cf);
+                newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
+                if (!isOffline)
+                    cfs.getDataTracker().addSSTables(Collections.singleton(newInOrderSstable));
+                outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable));
             }
+
+            // finish obsoletes the old sstable
+            writer.finish(!isOffline, badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt);
+            if (!writer.finished().isEmpty())
+                newSstable = writer.finished().get(0);
         }
         catch (Throwable t)
         {
-            if (writer != null)
-                writer.abort();
+            writer.abort();
             throw Throwables.propagate(t);
         }
         finally
@@ -258,17 +268,6 @@ public class Scrubber implements Closeable
             controller.close();
         }
 
-        if (!outOfOrderRows.isEmpty())
-        {
-            // out of order rows, but no bad rows found - we can keep our repairedAt time
-            long repairedAt = badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt;
-            SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);
-            for (Row row : outOfOrderRows)
-                inOrderWriter.append(row.key, row.cf);
-            newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
-            outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable));
-        }
-
         if (newSstable == null)
         {
             if (badRows > 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 763d20b..461c5e1 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -270,7 +270,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
                 return null;
 
             if (cfs.getDataTracker().markCompacting(hottestBucket))
-                return new CompactionTask(cfs, hottestBucket, gcBefore);
+                return new CompactionTask(cfs, hottestBucket, gcBefore, false);
         }
     }
 
@@ -289,7 +289,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
             else
                 unrepaired.add(sstable);
         }
-        return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, repaired, gcBefore), new CompactionTask(cfs, unrepaired, gcBefore));
+        return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, repaired, gcBefore, false), new CompactionTask(cfs, unrepaired, gcBefore, false));
     }
 
     public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore)
@@ -302,7 +302,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
             return null;
         }
 
-        return new CompactionTask(cfs, sstables, gcBefore).setUserDefined(true);
+        return new CompactionTask(cfs, sstables, gcBefore, false).setUserDefined(true);
     }
 
     public int getEstimatedRemainingTasks()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 740a3eb..734fe23 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.OutputHandler;
 
@@ -34,7 +35,7 @@ public class Upgrader
 {
     private final ColumnFamilyStore cfs;
     private final SSTableReader sstable;
-    private final Collection<SSTableReader> toUpgrade;
+    private final Set<SSTableReader> toUpgrade;
     private final File directory;
 
     private final OperationType compactionType = OperationType.UPGRADE_SSTABLES;
@@ -48,7 +49,7 @@ public class Upgrader
     {
         this.cfs = cfs;
         this.sstable = sstable;
-        this.toUpgrade = Collections.singletonList(sstable);
+        this.toUpgrade = new HashSet<>(Collections.singleton(sstable));
         this.outputHandler = outputHandler;
 
         this.directory = new File(sstable.getFilename()).getParentFile();
@@ -86,56 +87,28 @@ public class Upgrader
     {
         outputHandler.output("Upgrading " + sstable);
 
-
-        AbstractCompactionIterable ci = new CompactionIterable(compactionType, strategy.getScanners(this.toUpgrade), controller);
-
-        CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
-
-        Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
-        Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();
-
-        try
+        SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(this.toUpgrade), OperationType.UPGRADE_SSTABLES, true);
+        try (CloseableIterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, strategy.getScanners(this.toUpgrade), controller).iterator())
         {
-            SSTableWriter writer = createCompactionWriter(sstable.getSSTableMetadata().repairedAt);
-            writers.add(writer);
+            writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
             while (iter.hasNext())
             {
                 AbstractCompactedRow row = iter.next();
-
                 writer.append(row);
             }
 
-            long maxAge = CompactionTask.getMaxDataAge(this.toUpgrade);
-            for (SSTableWriter completedWriter : writers)
-                sstables.add(completedWriter.closeAndOpenReader(maxAge));
-
+            writer.finish();
             outputHandler.output("Upgrade of " + sstable + " complete.");
 
         }
         catch (Throwable t)
         {
-            for (SSTableWriter writer : writers)
-                writer.abort();
-            // also remove already completed SSTables
-            for (SSTableReader sstable : sstables)
-            {
-                sstable.markObsolete();
-                sstable.releaseReference();
-            }
+            writer.abort();
             throw Throwables.propagate(t);
         }
         finally
         {
             controller.close();
-
-            try
-            {
-                iter.close();
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index b8a21cc..e533b1e 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -56,19 +56,17 @@ public class CompressedSequentialWriter extends SequentialWriter
 
     public CompressedSequentialWriter(File file,
                                       String offsetsPath,
-                                      boolean skipIOCache,
                                       CompressionParameters parameters,
                                       MetadataCollector sstableMetadataCollector)
     {
-        super(file, parameters.chunkLength(), skipIOCache);
+        super(file, parameters.chunkLength());
         this.compressor = parameters.sstableCompressor;
 
         // buffer for compression should be the same size as buffer itself
         compressed = new ICompressor.WrappedArray(new byte[compressor.initialCompressedBufferLength(buffer.length)]);
 
         /* Index File (-CompressionInfo.db component) and it's header */
-        metadataWriter = CompressionMetadata.Writer.open(offsetsPath);
-        metadataWriter.writeHeader(parameters);
+        metadataWriter = CompressionMetadata.Writer.open(parameters, offsetsPath);
 
         this.sstableMetadataCollector = sstableMetadataCollector;
         crcMetadata = new DataIntegrityMetadata.ChecksumWriter(out);
@@ -102,8 +100,7 @@ public class CompressedSequentialWriter extends SequentialWriter
     @Override
     protected void flushData()
     {
-        seekToChunkStart();
-
+        seekToChunkStart(); // why is this necessary? seems like it should always be at chunk start in normal operation
 
         int compressedLength;
         try
@@ -122,7 +119,7 @@ public class CompressedSequentialWriter extends SequentialWriter
         try
         {
             // write an offset of the newly written chunk to the index file
-            metadataWriter.writeLong(chunkOffset);
+            metadataWriter.addOffset(chunkOffset);
             chunkCount++;
 
             assert compressedLength <= compressed.buffer.length;
@@ -131,6 +128,7 @@ public class CompressedSequentialWriter extends SequentialWriter
             out.write(compressed.buffer, 0, compressedLength);
             // write corresponding checksum
             crcMetadata.append(compressed.buffer, 0, compressedLength);
+            lastFlushOffset += compressedLength + 4;
         }
         catch (IOException e)
         {
@@ -141,6 +139,17 @@ public class CompressedSequentialWriter extends SequentialWriter
         chunkOffset += compressedLength + 4;
     }
 
+    public CompressionMetadata openEarly()
+    {
+        return metadataWriter.openEarly(originalSize, chunkOffset);
+    }
+
+    public CompressionMetadata openAfterClose()
+    {
+        assert current == originalSize;
+        return metadataWriter.openAfterClose(current, chunkOffset);
+    }
+
     @Override
     public FileMark mark()
     {
@@ -246,10 +255,9 @@ public class CompressedSequentialWriter extends SequentialWriter
 
         super.close();
         sstableMetadataCollector.addCompressionRatio(compressedSize, originalSize);
-        metadataWriter.finalizeHeader(current, chunkCount);
         try
         {
-            metadataWriter.close();
+            metadataWriter.close(current, chunkCount);
         }
         catch (IOException e)
         {