You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/04/23 16:29:29 UTC

[1/3] Preemptive open of compaction results

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 b3a225ef1 -> 4e95953f2


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/service/FileCacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/FileCacheService.java b/src/java/org/apache/cassandra/service/FileCacheService.java
index d22763b..9f57995 100644
--- a/src/java/org/apache/cassandra/service/FileCacheService.java
+++ b/src/java/org/apache/cassandra/service/FileCacheService.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.cache.*;
 import org.slf4j.Logger;
@@ -41,36 +42,66 @@ public class FileCacheService
 
     public static FileCacheService instance = new FileCacheService();
 
-    private static final Callable<Queue<RandomAccessReader>> cacheForPathCreator = new Callable<Queue<RandomAccessReader>>()
+    private static final AtomicLong cacheKeyIdCounter = new AtomicLong();
+    public static final class CacheKey
+    {
+        final long id;
+        public CacheKey()
+        {
+            this.id = cacheKeyIdCounter.incrementAndGet();
+        }
+        public boolean equals(Object that)
+        {
+            return that instanceof CacheKey && ((CacheKey) that).id == this.id;
+        }
+        public int hashCode()
+        {
+            return (int) id;
+        }
+    }
+
+    private static final Callable<CacheBucket> cacheForPathCreator = new Callable<CacheBucket>()
     {
         @Override
-        public Queue<RandomAccessReader> call()
+        public CacheBucket call()
         {
-            return new ConcurrentLinkedQueue<RandomAccessReader>();
+            return new CacheBucket();
         }
     };
 
     private static final AtomicInteger memoryUsage = new AtomicInteger();
 
-    private final Cache<String, Queue<RandomAccessReader>> cache;
+    private final Cache<CacheKey, CacheBucket> cache;
     private final FileCacheMetrics metrics = new FileCacheMetrics();
 
+    private static final class CacheBucket
+    {
+        final ConcurrentLinkedQueue<RandomAccessReader> queue = new ConcurrentLinkedQueue<>();
+        volatile boolean discarded = false;
+    }
+
     protected FileCacheService()
     {
-        RemovalListener<String, Queue<RandomAccessReader>> onRemove = new RemovalListener<String, Queue<RandomAccessReader>>()
+        RemovalListener<CacheKey, CacheBucket> onRemove = new RemovalListener<CacheKey, CacheBucket>()
         {
             @Override
-            public void onRemoval(RemovalNotification<String, Queue<RandomAccessReader>> notification)
+            public void onRemoval(RemovalNotification<CacheKey, CacheBucket> notification)
             {
-                Queue<RandomAccessReader> cachedInstances = notification.getValue();
-                if (cachedInstances == null)
+                CacheBucket bucket = notification.getValue();
+                if (bucket == null)
                     return;
 
-                if (cachedInstances.size() > 0)
-                    logger.debug("Evicting cold readers for {}", cachedInstances.peek().getPath());
-
-                for (RandomAccessReader reader : cachedInstances)
+                // set discarded before deallocating the readers, to ensure we don't leak any
+                bucket.discarded = true;
+                Queue<RandomAccessReader> q = bucket.queue;
+                boolean first = true;
+                for (RandomAccessReader reader = q.poll() ; reader != null ; reader = q.poll())
                 {
+                    if (logger.isDebugEnabled() && first)
+                    {
+                        logger.debug("Evicting cold readers for {}", reader.getPath());
+                        first = false;
+                    }
                     memoryUsage.addAndGet(-1 * reader.getTotalBufferSize());
                     reader.deallocate();
                 }
@@ -81,15 +112,16 @@ public class FileCacheService
                 .expireAfterAccess(AFTER_ACCESS_EXPIRATION, TimeUnit.MILLISECONDS)
                 .concurrencyLevel(DatabaseDescriptor.getConcurrentReaders())
                 .removalListener(onRemove)
+                .initialCapacity(16 << 10)
                 .build();
     }
 
-    public RandomAccessReader get(String path)
+    public RandomAccessReader get(CacheKey key)
     {
         metrics.requests.mark();
 
-        Queue<RandomAccessReader> instances = getCacheFor(path);
-        RandomAccessReader result = instances.poll();
+        CacheBucket bucket = getCacheFor(key);
+        RandomAccessReader result = bucket.queue.poll();
         if (result != null)
         {
             metrics.hits.mark();
@@ -99,11 +131,11 @@ public class FileCacheService
         return result;
     }
 
-    private Queue<RandomAccessReader> getCacheFor(String path)
+    private CacheBucket getCacheFor(CacheKey key)
     {
         try
         {
-            return cache.get(path, cacheForPathCreator);
+            return cache.get(key, cacheForPathCreator);
         }
         catch (ExecutionException e)
         {
@@ -111,34 +143,46 @@ public class FileCacheService
         }
     }
 
-    public void put(RandomAccessReader instance)
+    public void put(CacheKey cacheKey, RandomAccessReader instance)
     {
         int memoryUsed = memoryUsage.get();
         if (logger.isDebugEnabled())
             logger.debug("Estimated memory usage is {} compared to actual usage {}", memoryUsed, sizeInBytes());
 
-        if (memoryUsed >= MEMORY_USAGE_THRESHOLD)
+        CacheBucket bucket = cache.getIfPresent(cacheKey);
+        if (memoryUsed >= MEMORY_USAGE_THRESHOLD || bucket == null)
         {
             instance.deallocate();
         }
         else
         {
             memoryUsage.addAndGet(instance.getTotalBufferSize());
-            getCacheFor(instance.getPath()).add(instance);
+            bucket.queue.add(instance);
+            if (bucket.discarded)
+            {
+                RandomAccessReader reader = bucket.queue.poll();
+                if (reader != null)
+                {
+                    memoryUsage.addAndGet(-1 * reader.getTotalBufferSize());
+                    reader.deallocate();
+                }
+            }
         }
     }
 
-    public void invalidate(String path)
+    public void invalidate(CacheKey cacheKey, String path)
     {
-        logger.debug("Invalidating cache for {}", path);
-        cache.invalidate(path);
+        if (logger.isDebugEnabled())
+            logger.debug("Invalidating cache for {}", path);
+        cache.invalidate(cacheKey);
     }
 
+    // TODO: this method is unsafe, as it calls getTotalBufferSize() on items that can have been discarded
     public long sizeInBytes()
     {
         long n = 0;
-        for (Queue<RandomAccessReader> queue : cache.asMap().values())
-            for (RandomAccessReader reader : queue)
+        for (CacheBucket bucket : cache.asMap().values())
+            for (RandomAccessReader reader : bucket.queue)
                 n += reader.getTotalBufferSize();
         return n;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/streaming/StreamLockfile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamLockfile.java b/src/java/org/apache/cassandra/streaming/StreamLockfile.java
index 0eb01c5..4d20479 100644
--- a/src/java/org/apache/cassandra/streaming/StreamLockfile.java
+++ b/src/java/org/apache/cassandra/streaming/StreamLockfile.java
@@ -21,15 +21,20 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
 
 import com.google.common.base.Charsets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.util.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Encapsulates the behavior for 'locking' any streamed sttables to a node.
@@ -69,7 +74,7 @@ public class StreamLockfile
             /* write out the file names *without* the 'tmp-file' flag in the file name.
                this class will not need to clean up tmp files (on restart), CassandraDaemon does that already,
                just make sure we delete the fully-formed SSTRs. */
-            sstablePaths.add(writer.descriptor.asTemporary(false).baseFilename());
+            sstablePaths.add(writer.descriptor.asType(Descriptor.Type.FINAL).baseFilename());
         }
 
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index e91f58f..78d4d9e 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -117,17 +117,6 @@ public class StandaloneScrubber
                             scrubber.close();
                         }
 
-                        if (manifest != null)
-                        {
-                            if (scrubber.getNewInOrderSSTable() != null)
-                                manifest.add(scrubber.getNewInOrderSSTable());
-
-                            List<SSTableReader> added = scrubber.getNewSSTable() == null
-                                ? Collections.<SSTableReader>emptyList()
-                                : Collections.singletonList(scrubber.getNewSSTable());
-                            manifest.replace(Collections.singletonList(sstable), added);
-                        }
-
                         // Remove the sstable (it's been copied by scrub and snapshotted)
                         sstable.markObsolete();
                         sstable.releaseReference();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
index 8b92586..9353ce9 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
@@ -140,10 +140,6 @@ public class StandaloneSplitter
                 try
                 {
                     new SSTableSplitter(cfs, sstable, options.sizeInMB).split();
-
-                    // Remove the sstable
-                    sstable.markObsolete();
-                    sstable.releaseReference();
                 }
                 catch (Exception e)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
index a00245b..55f206e 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
@@ -97,9 +97,6 @@ public class StandaloneUpgrader
                 {
                     Upgrader upgrader = new Upgrader(cfs, sstable, handler);
                     upgrader.upgrade();
-
-                    sstable.markObsolete();
-                    sstable.releaseReference();
                 }
                 catch (Exception e)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/utils/CLibrary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CLibrary.java b/src/java/org/apache/cassandra/utils/CLibrary.java
index ac9f863..1d3c014 100644
--- a/src/java/org/apache/cassandra/utils/CLibrary.java
+++ b/src/java/org/apache/cassandra/utils/CLibrary.java
@@ -18,6 +18,9 @@
 package org.apache.cassandra.utils;
 
 import java.io.FileDescriptor;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.lang.reflect.Field;
 
 import org.slf4j.Logger;
@@ -139,6 +142,25 @@ public final class CLibrary
         }
     }
 
+    public static void trySkipCache(String path, long offset, long len)
+    {
+        trySkipCache(getfd(path), offset, len);
+    }
+
+    public static void trySkipCache(int fd, long offset, long len)
+    {
+        if (len == 0)
+            trySkipCache(fd, 0, 0);
+
+        while (len > 0)
+        {
+            int sublen = (int) Math.min(Integer.MAX_VALUE, len);
+            trySkipCache(fd, offset, sublen);
+            len -= sublen;
+            offset -= sublen;
+        }
+    }
+
     public static void trySkipCache(int fd, long offset, int len)
     {
         if (fd < 0)
@@ -280,33 +302,30 @@ public final class CLibrary
         return -1;
     }
 
-    /**
-     * Suggest kernel to preheat one page for the given file.
-     *
-     * @param fd The file descriptor of file to preheat.
-     * @param position The offset of the block.
-     *
-     * @return On success, zero is returned. On error, an error number is returned.
-     */
-    public static int preheatPage(int fd, long position)
+    public static int getfd(String path)
     {
+        RandomAccessFile file = null;
         try
         {
-            // 4096 is good for SSD because they operate on "Pages" 4KB in size
-            return posix_fadvise(fd, position, 4096, POSIX_FADV_WILLNEED);
+            file = new RandomAccessFile(path, "r");
+            return getfd(file.getFD());
         }
-        catch (UnsatisfiedLinkError e)
+        catch (Throwable t)
         {
-            // JNA is unavailable just skipping
+            // ignore
+            return -1;
         }
-        catch (RuntimeException e)
+        finally
         {
-            if (!(e instanceof LastErrorException))
-                throw e;
-
-            logger.warn(String.format("posix_fadvise(%d, %d) failed, errno (%d).", fd, position, errno(e)));
+            try
+            {
+                if (file != null)
+                    file.close();
+            }
+            catch (Throwable t)
+            {
+                // ignore
+            }
         }
-
-        return -1;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
index de8da01..3007292 100644
--- a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
@@ -21,7 +21,6 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.cassandra.cache.RefCountedMemory;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.Memory;
 
@@ -42,7 +41,7 @@ public class OffHeapBitSet implements IBitSet
         try
         {
             long byteCount = wordCount * 8L;
-            bytes = RefCountedMemory.allocate(byteCount);
+            bytes = Memory.allocate(byteCount);
         }
         catch (OutOfMemoryError e)
         {
@@ -123,7 +122,7 @@ public class OffHeapBitSet implements IBitSet
     public static OffHeapBitSet deserialize(DataInput in) throws IOException
     {
         long byteCount = in.readInt() * 8L;
-        Memory memory = RefCountedMemory.allocate(byteCount);
+        Memory memory = Memory.allocate(byteCount);
         for (long i = 0; i < byteCount;)
         {
             long v = in.readLong();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
index d68ba10..35c2b5e 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
@@ -99,7 +99,7 @@ public class LongCompactionsTest extends SchemaLoader
 
         long start = System.nanoTime();
         final int gcBefore = (int) (System.currentTimeMillis() / 1000) - Schema.instance.getCFMetaData(KEYSPACE1, "Standard1").getGcGraceSeconds();
-        new CompactionTask(store, sstables, gcBefore).execute(null);
+        new CompactionTask(store, sstables, gcBefore, false).execute(null);
         System.out.println(String.format("%s: sstables=%d rowsper=%d colsper=%d: %d ms",
                                          this.getClass().getName(),
                                          sstableCount,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index b3f7429..d180b82 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -22,15 +22,26 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.util.FileUtils;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.junit.Test;
@@ -41,28 +52,55 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.IndexType;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.composites.*;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.CellNames;
+import org.apache.cassandra.db.composites.Composites;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.marshal.LexicalUUIDType;
 import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.ExcludingBounds;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.IncludingExcludingBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableDeletingTask;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableSimpleWriter;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.thrift.ThriftValidation;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 
-import static org.junit.Assert.*;
-import static org.apache.cassandra.Util.*;
+import static org.apache.cassandra.Util.cellname;
+import static org.apache.cassandra.Util.column;
+import static org.apache.cassandra.Util.dk;
+import static org.apache.cassandra.Util.rp;
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(OrderedJUnit4ClassRunner.class)
 public class ColumnFamilyStoreTest extends SchemaLoader
@@ -916,8 +954,8 @@ public class ColumnFamilyStoreTest extends SchemaLoader
 
         for (int version = 1; version <= 2; ++version)
         {
-            Descriptor existing = new Descriptor(cfs.directories.getDirectoryForCompactedSSTables(), "Keyspace2", "Standard1", version, false);
-            Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing), "Keyspace2", "Standard1", version, false);
+            Descriptor existing = new Descriptor(cfs.directories.getDirectoryForCompactedSSTables(), "Keyspace2", "Standard1", version, Descriptor.Type.FINAL);
+            Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing), "Keyspace2", "Standard1", version, Descriptor.Type.FINAL);
             for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.STATS })
                 assertTrue("can not find backedup file:" + desc.filenameFor(c), new File(desc.filenameFor(c)).exists());
         }
@@ -1697,7 +1735,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
                 MetadataCollector collector = new MetadataCollector(cfmeta.comparator);
                 for (int ancestor : ancestors)
                     collector.addAncestor(ancestor);
-                String file = new Descriptor(directory, ks, cf, 3, true).filenameFor(Component.DATA);
+                String file = new Descriptor(directory, ks, cf, 3, Descriptor.Type.TEMP).filenameFor(Component.DATA);
                 return new SSTableWriter(file,
                                          0,
                                          ActiveRepairService.UNREPAIRED_SSTABLE,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index 97cd21c..05e0beb 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -19,20 +19,25 @@ package org.apache.cassandra.db;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Config.DiskFailurePolicy;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Directories.DataDirectory;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -99,7 +104,7 @@ public class DirectoriesTest
 
     private static void createFakeSSTable(File dir, String cf, int gen, boolean temp, List<File> addTo) throws IOException
     {
-        Descriptor desc = new Descriptor(dir, KS, cf, gen, temp);
+        Descriptor desc = new Descriptor(dir, KS, cf, gen, temp ? Descriptor.Type.TEMP : Descriptor.Type.FINAL);
         for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX, Component.FILTER })
         {
             File f = new File(desc.filenameFor(c));
@@ -122,7 +127,7 @@ public class DirectoriesTest
             Directories directories = new Directories(cfm);
             assertEquals(cfDir(cfm), directories.getDirectoryForCompactedSSTables());
 
-            Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, false);
+            Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, Descriptor.Type.FINAL);
             File snapshotDir = new File(cfDir(cfm),  File.separator + Directories.SNAPSHOT_SUBDIR + File.separator + "42");
             assertEquals(snapshotDir, Directories.getSnapshotDirectory(desc, "42"));
 
@@ -224,7 +229,7 @@ public class DirectoriesTest
             final String n = Long.toString(System.nanoTime());
             Callable<File> directoryGetter = new Callable<File>() {
                 public File call() throws Exception {
-                    Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, false);
+                    Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, Descriptor.Type.FINAL);
                     return Directories.getSnapshotDirectory(desc, n);
                 }
             };

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index 6ca5487..c48a728 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -17,8 +17,10 @@
  */
 package org.apache.cassandra.db;
 
+import java.nio.file.Files;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
 import org.junit.AfterClass;
@@ -30,7 +32,9 @@ import org.apache.cassandra.cache.KeyCacheKey;
 import org.apache.cassandra.db.composites.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.junit.Assert.assertEquals;
@@ -145,11 +149,22 @@ public class KeyCacheTest extends SchemaLoader
 
         assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);
 
+        Set<SSTableReader> readers = cfs.getDataTracker().getSSTables();
+        for (SSTableReader reader : readers)
+            reader.acquireReference();
+
         Util.compactAll(cfs, Integer.MAX_VALUE).get();
-        // after compaction cache should have entries for
-        // new SSTables, if we had 2 keys in cache previously it should become 4
+        // after compaction cache should have entries for new SSTables,
+        // but since we have kept a reference to the old sstables,
+        // if we had 2 keys in cache previously it should become 4
         assertKeyCacheSize(4, KEYSPACE1, COLUMN_FAMILY1);
 
+        for (SSTableReader reader : readers)
+            reader.releaseReference();
+
+        // after releasing the reference this should drop to 2
+        assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);
+
         // re-read same keys to verify that key cache didn't grow further
         cfs.getColumnFamily(QueryFilter.getSliceFilter(key1,
                                                        COLUMN_FAMILY1,
@@ -167,7 +182,7 @@ public class KeyCacheTest extends SchemaLoader
                                                        10,
                                                        System.currentTimeMillis()));
 
-        assertKeyCacheSize(4, KEYSPACE1, COLUMN_FAMILY1);
+        assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);
     }
 
     private void assertKeyCacheSize(int expected, String keyspace, String columnFamily)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index b8c7980..e820fc2 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -20,8 +20,10 @@ package org.apache.cassandra.db;
  *
  */
 
-import java.io.*;
-import java.util.Collections;
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -29,7 +31,6 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.commons.lang3.StringUtils;
@@ -37,17 +38,18 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
-import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.compaction.Scrubber;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.Scrubber;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
-import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.apache.cassandra.Util.cellname;
@@ -112,7 +114,7 @@ public class ScrubTest extends SchemaLoader
         file.close();
 
         // with skipCorrupted == false, the scrub is expected to fail
-        Scrubber scrubber = new Scrubber(cfs, sstable, false);
+        Scrubber scrubber = new Scrubber(cfs, sstable, false, false);
         try
         {
             scrubber.scrub();
@@ -121,10 +123,9 @@ public class ScrubTest extends SchemaLoader
         catch (IOError err) {}
 
         // with skipCorrupted == true, the corrupt row will be skipped
-        scrubber = new Scrubber(cfs, sstable, true);
+        scrubber = new Scrubber(cfs, sstable, true, false);
         scrubber.scrub();
         scrubber.close();
-        cfs.replaceCompactedSSTables(Collections.singletonList(sstable), Collections.singletonList(scrubber.getNewSSTable()), OperationType.SCRUB);
         assertEquals(1, cfs.getSSTables().size());
 
         // verify that we can read all of the rows, and there is now one less row
@@ -206,7 +207,7 @@ public class ScrubTest extends SchemaLoader
         assert root != null;
         File rootDir = new File(root);
         assert rootDir.isDirectory();
-        Descriptor desc = new Descriptor(new Descriptor.Version("jb"), rootDir, KEYSPACE, columnFamily, 1, false);
+        Descriptor desc = new Descriptor(new Descriptor.Version("jb"), rootDir, KEYSPACE, columnFamily, 1, Descriptor.Type.FINAL);
         CFMetaData metadata = Schema.instance.getCFMetaData(desc.ksname, desc.cfname);
 
         try
@@ -227,7 +228,7 @@ public class ScrubTest extends SchemaLoader
         components.add(Component.TOC);
         SSTableReader sstable = SSTableReader.openNoValidation(desc, components, metadata);
 
-        Scrubber scrubber = new Scrubber(cfs, sstable, false);
+        Scrubber scrubber = new Scrubber(cfs, sstable, false, true);
         scrubber.scrub();
 
         cfs.loadNewSSTables();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index f8fcf76..900abd8 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -18,19 +18,22 @@
  */
 package org.apache.cassandra.io.compress;
 
-import java.io.*;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.Collections;
 import java.util.Random;
 
 import org.junit.Test;
 
-import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
-import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.SequentialWriter;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -61,7 +64,7 @@ public class CompressedRandomAccessReaderTest
         {
 
             MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
-            CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", false, new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap()), sstableMetadataCollector);
+            CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap()), sstableMetadataCollector);
 
             for (int i = 0; i < 20; i++)
                 writer.write("x".getBytes());
@@ -101,8 +104,8 @@ public class CompressedRandomAccessReaderTest
         {
             MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
             SequentialWriter writer = compressed
-                ? new CompressedSequentialWriter(f, filename + ".metadata", false, new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector)
-                : new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
+                ? new CompressedSequentialWriter(f, filename + ".metadata", new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector)
+                : new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH);
 
             writer.write("The quick ".getBytes());
             FileMark mark = writer.mark();
@@ -151,7 +154,7 @@ public class CompressedRandomAccessReaderTest
         metadata.deleteOnExit();
 
         MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
-        SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), false, new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector);
+        SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector);
 
         writer.write(CONTENT.getBytes());
         writer.close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index e26d0f5..2dc07ec 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -79,7 +79,7 @@ public class LegacySSTableTest extends SchemaLoader
     protected Descriptor getDescriptor(String ver)
     {
         File directory = new File(LEGACY_SSTABLE_ROOT + File.separator + ver + File.separator + KSNAME);
-        return new Descriptor(ver, directory, KSNAME, CFNAME, 0, false);
+        return new Descriptor(ver, directory, KSNAME, CFNAME, 0, Descriptor.Type.FINAL);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 292a51e..19a0b13 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -60,7 +60,7 @@ public class SSTableUtils
         File keyspaceDir = new File(tempdir, keyspaceName);
         keyspaceDir.mkdir();
         keyspaceDir.deleteOnExit();
-        File datafile = new File(new Descriptor(keyspaceDir, keyspaceName, cfname, generation, false).filenameFor("Data.db"));
+        File datafile = new File(new Descriptor(keyspaceDir, keyspaceName, cfname, generation, Descriptor.Type.FINAL).filenameFor("Data.db"));
         if (!datafile.createNewFile())
             throw new IOException("unable to create file " + datafile);
         datafile.deleteOnExit();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
index da0e31a..7751a51 100644
--- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.io.sstable.metadata;
 
-import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -28,8 +27,8 @@ import java.util.Set;
 import com.google.common.collect.Sets;
 import org.junit.Test;
 
-import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.io.sstable.Component;
@@ -76,7 +75,7 @@ public class MetadataSerializerTest
             serializer.serialize(originalMetadata, out);
         }
 
-        Descriptor desc = new Descriptor(Descriptor.Version.CURRENT, statsFile.getParentFile(), "", "", 0, false);
+        Descriptor desc = new Descriptor(Descriptor.Version.CURRENT, statsFile.getParentFile(), "", "", 0, Descriptor.Type.FINAL);
         try (RandomAccessReader in = RandomAccessReader.open(statsFile))
         {
             Map<MetadataType, MetadataComponent> deserialized = serializer.deserialize(desc, in, EnumSet.allOf(MetadataType.class));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index 2a8c7a9..fb45dd3 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -132,7 +132,7 @@ public class DataOutputTest
     public void testSequentialWriter() throws IOException
     {
         File file = FileUtils.createTempFile("dataoutput", "test");
-        final SequentialWriter writer = new SequentialWriter(file, 32, true);
+        final SequentialWriter writer = new SequentialWriter(file, 32);
         DataOutputStreamAndChannel write = new DataOutputStreamAndChannel(writer, writer);
         DataInput canon = testWrite(write);
         write.flush();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 8d9480b..dbc1ec2 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -63,7 +63,7 @@ public class CompressedInputStreamTest
         Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
         MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
         CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
-        CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), false, param, collector);
+        CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
         Map<Long, Long> index = new HashMap<Long, Long>();
         for (long l = 0L; l < 1000; l++)
         {


[2/3] Preemptive open of compaction results

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 9d7729b..8cd8c9f 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -17,14 +17,29 @@
  */
 package org.apache.cassandra.io.compress;
 
-import java.io.*;
-import java.util.*;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Longs;
 
-import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.cache.RefCountedMemory;
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -45,6 +60,7 @@ public class CompressionMetadata
     public final long compressedFileLength;
     public final boolean hasPostCompressionAdlerChecksums;
     private final Memory chunkOffsets;
+    private final long chunkOffsetsSize;
     public final String indexFilePath;
     public final CompressionParameters parameters;
 
@@ -85,7 +101,7 @@ public class CompressionMetadata
         {
             String compressorName = stream.readUTF();
             int optionCount = stream.readInt();
-            Map<String, String> options = new HashMap<String, String>();
+            Map<String, String> options = new HashMap<>();
             for (int i = 0; i < optionCount; ++i)
             {
                 String key = stream.readUTF();
@@ -114,6 +130,19 @@ public class CompressionMetadata
         {
             FileUtils.closeQuietly(stream);
         }
+        this.chunkOffsetsSize = chunkOffsets.size();
+    }
+
+    private CompressionMetadata(String filePath, CompressionParameters parameters, RefCountedMemory offsets, long offsetsSize, long dataLength, long compressedLength, boolean hasPostCompressionAdlerChecksums)
+    {
+        this.indexFilePath = filePath;
+        this.parameters = parameters;
+        this.dataLength = dataLength;
+        this.compressedFileLength = compressedLength;
+        this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
+        this.chunkOffsets = offsets;
+        offsets.reference();
+        this.chunkOffsetsSize = offsetsSize;
     }
 
     public ICompressor compressor()
@@ -173,7 +202,7 @@ public class CompressionMetadata
         // position of the chunk
         int idx = 8 * (int) (position / parameters.chunkLength());
 
-        if (idx >= chunkOffsets.size())
+        if (idx >= chunkOffsetsSize)
             throw new CorruptSSTableException(new EOFException(), indexFilePath);
 
         long chunkOffset = chunkOffsets.getLong(idx);
@@ -207,7 +236,7 @@ public class CompressionMetadata
             {
                 long offset = i * 8;
                 long chunkOffset = chunkOffsets.getLong(offset);
-                long nextChunkOffset = offset + 8 == chunkOffsets.size()
+                long nextChunkOffset = offset + 8 == chunkOffsetsSize
                                      ? compressedFileLength
                                      : chunkOffsets.getLong(offset + 8);
                 offsets.add(new Chunk(chunkOffset, (int) (nextChunkOffset - chunkOffset - 4))); // "4" bytes reserved for checksum
@@ -218,52 +247,60 @@ public class CompressionMetadata
 
     public void close()
     {
-        chunkOffsets.free();
+        if (chunkOffsets instanceof RefCountedMemory)
+            ((RefCountedMemory) chunkOffsets).unreference();
+        else
+            chunkOffsets.free();
     }
 
-    public static class Writer extends RandomAccessFile
+    public static class Writer
     {
-        // place for uncompressed data length in the index file
-        private long dataLengthOffset = -1;
         // path to the file
+        private final CompressionParameters parameters;
         private final String filePath;
+        private int maxCount = 100;
+        private RefCountedMemory offsets = new RefCountedMemory(maxCount * 8);
+        private int count = 0;
 
-        private Writer(String path) throws FileNotFoundException
+        private Writer(CompressionParameters parameters, String path)
         {
-            super(path, "rw");
+            this.parameters = parameters;
             filePath = path;
         }
 
-        public static Writer open(String path)
+        public static Writer open(CompressionParameters parameters, String path)
         {
-            try
-            {
-                return new Writer(path);
-            }
-            catch (FileNotFoundException e)
+            return new Writer(parameters, path);
+        }
+
+        public void addOffset(long offset)
+        {
+            if (count == maxCount)
             {
-                throw new RuntimeException(e);
+                RefCountedMemory newOffsets = offsets.copy((maxCount *= 2) * 8);
+                offsets.unreference();
+                offsets = newOffsets;
             }
+            offsets.setLong(8 * count++, offset);
         }
 
-        public void writeHeader(CompressionParameters parameters)
+        private void writeHeader(DataOutput out, long dataLength, int chunks)
         {
             try
             {
-                writeUTF(parameters.sstableCompressor.getClass().getSimpleName());
-                writeInt(parameters.otherOptions.size());
+                out.writeUTF(parameters.sstableCompressor.getClass().getSimpleName());
+                out.writeInt(parameters.otherOptions.size());
                 for (Map.Entry<String, String> entry : parameters.otherOptions.entrySet())
                 {
-                    writeUTF(entry.getKey());
-                    writeUTF(entry.getValue());
+                    out.writeUTF(entry.getKey());
+                    out.writeUTF(entry.getValue());
                 }
 
                 // store the length of the chunk
-                writeInt(parameters.chunkLength());
+                out.writeInt(parameters.chunkLength());
                 // store position and reserve a place for uncompressed data length and chunks count
-                dataLengthOffset = getFilePointer();
-                writeLong(-1);
-                writeInt(-1);
+                out.writeLong(dataLength);
+                out.writeInt(chunks);
             }
             catch (IOException e)
             {
@@ -271,36 +308,16 @@ public class CompressionMetadata
             }
         }
 
-        public void finalizeHeader(long dataLength, int chunks)
+        public CompressionMetadata openEarly(long dataLength, long compressedLength)
         {
-            assert dataLengthOffset != -1 : "writeHeader wasn't called";
-
-            long currentPosition;
-            try
-            {
-                currentPosition = getFilePointer();
-            }
-            catch (IOException e)
-            {
-                throw new FSReadError(e, filePath);
-            }
-
-            try
-            {
-                // seek back to the data length position
-                seek(dataLengthOffset);
-
-                // write uncompressed data length and chunks count
-                writeLong(dataLength);
-                writeInt(chunks);
+            return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums);
+        }
 
-                // seek forward to the previous position
-                seek(currentPosition);
-            }
-            catch (IOException e)
-            {
-                throw new FSWriteError(e, filePath);
-            }
+        public CompressionMetadata openAfterClose(long dataLength, long compressedLength)
+        {
+            RefCountedMemory newOffsets = offsets.copy(count * 8L);
+            offsets.unreference();
+            return new CompressionMetadata(filePath, parameters, newOffsets, count * 8L, dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums);
         }
 
         /**
@@ -312,33 +329,7 @@ public class CompressionMetadata
          */
         public long chunkOffsetBy(int chunkIndex)
         {
-            if (dataLengthOffset == -1)
-                throw new IllegalStateException("writeHeader wasn't called");
-
-            try
-            {
-                long position = getFilePointer();
-
-                // seek to the position of the given chunk
-                seek(dataLengthOffset
-                     + 8 // size reserved for uncompressed data length
-                     + 4 // size reserved for chunk count
-                     + (chunkIndex * 8L));
-
-                try
-                {
-                    return readLong();
-                }
-                finally
-                {
-                    // back to the original position
-                    seek(position);
-                }
-            }
-            catch (IOException e)
-            {
-                throw new FSReadError(e, filePath);
-            }
+            return offsets.getLong(chunkIndex * 8);
         }
 
         /**
@@ -347,25 +338,17 @@ public class CompressionMetadata
          */
         public void resetAndTruncate(int chunkIndex)
         {
-            try
-            {
-                seek(dataLengthOffset
-                     + 8 // size reserved for uncompressed data length
-                     + 4 // size reserved for chunk count
-                     + (chunkIndex * 8L));
-                getChannel().truncate(getFilePointer());
-            }
-            catch (IOException e)
-            {
-                throw new FSWriteError(e, filePath);
-            }
+            count = chunkIndex;
         }
 
-        public void close() throws IOException
+        public void close(long dataLength, int chunks) throws IOException
         {
-            if (getChannel().isOpen()) // if RAF.closed were public we could just use that, but it's not
-                getChannel().force(true);
-            super.close();
+            final DataOutputStream out = new DataOutputStream(new FileOutputStream(filePath));
+            assert chunks == count;
+            writeHeader(out, dataLength, chunks);
+            for (int i = 0 ; i < count ; i++)
+                out.writeLong(offsets.getLong(i * 8));
+            out.close();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 5d20652..7e7b364 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -83,7 +83,7 @@ public abstract class AbstractSSTableSimpleWriter
         int maxGen = 0;
         for (Descriptor desc : existing)
             maxGen = Math.max(maxGen, desc.generation);
-        return new Descriptor(directory, keyspace, columnFamily, maxGen + 1, true).filenameFor(Component.DATA);
+        return new Descriptor(directory, keyspace, columnFamily, maxGen + 1, Descriptor.Type.TEMP).filenameFor(Component.DATA);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index 18609bf..b42abf4 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -122,29 +122,41 @@ public class Descriptor
         }
     }
 
+    public static enum Type
+    {
+        TEMP("tmp", true), TEMPLINK("tmplink", true), FINAL(null, false);
+        public final boolean isTemporary;
+        public final String marker;
+        Type(String marker, boolean isTemporary)
+        {
+            this.isTemporary = isTemporary;
+            this.marker = marker;
+        }
+    }
+
     public final File directory;
     /** version has the following format: <code>[a-z]+</code> */
     public final Version version;
     public final String ksname;
     public final String cfname;
     public final int generation;
-    public final boolean temporary;
+    public final Type type;
     private final int hashCode;
 
     /**
      * A descriptor that assumes CURRENT_VERSION.
      */
-    public Descriptor(File directory, String ksname, String cfname, int generation, boolean temp)
+    public Descriptor(File directory, String ksname, String cfname, int generation, Type temp)
     {
         this(Version.CURRENT, directory, ksname, cfname, generation, temp);
     }
 
-    public Descriptor(String version, File directory, String ksname, String cfname, int generation, boolean temp)
+    public Descriptor(String version, File directory, String ksname, String cfname, int generation, Type temp)
     {
         this(new Version(version), directory, ksname, cfname, generation, temp);
     }
 
-    public Descriptor(Version version, File directory, String ksname, String cfname, int generation, boolean temp)
+    public Descriptor(Version version, File directory, String ksname, String cfname, int generation, Type temp)
     {
         assert version != null && directory != null && ksname != null && cfname != null;
         this.version = version;
@@ -152,13 +164,13 @@ public class Descriptor
         this.ksname = ksname;
         this.cfname = cfname;
         this.generation = generation;
-        temporary = temp;
+        type = temp;
         hashCode = Objects.hashCode(directory, generation, ksname, cfname, temp);
     }
 
     public Descriptor withGeneration(int newGeneration)
     {
-        return new Descriptor(version, directory, ksname, cfname, newGeneration, temporary);
+        return new Descriptor(version, directory, ksname, cfname, newGeneration, type);
     }
 
     public String filenameFor(Component component)
@@ -172,8 +184,8 @@ public class Descriptor
         buff.append(directory).append(File.separatorChar);
         buff.append(ksname).append(separator);
         buff.append(cfname).append(separator);
-        if (temporary)
-            buff.append(SSTable.TEMPFILE_MARKER).append(separator);
+        if (type.isTemporary)
+            buff.append(type.marker).append(separator);
         buff.append(version).append(separator);
         buff.append(generation);
         return buff.toString();
@@ -231,10 +243,15 @@ public class Descriptor
 
         // optional temporary marker
         nexttok = st.nextToken();
-        boolean temporary = false;
-        if (nexttok.equals(SSTable.TEMPFILE_MARKER))
+        Type type = Type.FINAL;
+        if (nexttok.equals(Type.TEMP.marker))
+        {
+            type = Type.TEMP;
+            nexttok = st.nextToken();
+        }
+        else if (nexttok.equals(Type.TEMPLINK.marker))
         {
-            temporary = true;
+            type = Type.TEMPLINK;
             nexttok = st.nextToken();
         }
 
@@ -250,16 +267,16 @@ public class Descriptor
         if (!skipComponent)
             component = st.nextToken();
         directory = directory != null ? directory : new File(".");
-        return Pair.create(new Descriptor(version, directory, ksname, cfname, generation, temporary), component);
+        return Pair.create(new Descriptor(version, directory, ksname, cfname, generation, type), component);
     }
 
     /**
-     * @param temporary temporary flag
+     * @param type temporary flag
      * @return A clone of this descriptor with the given 'temporary' status.
      */
-    public Descriptor asTemporary(boolean temporary)
+    public Descriptor asType(Type type)
     {
-        return new Descriptor(version, directory, ksname, cfname, generation, temporary);
+        return new Descriptor(version, directory, ksname, cfname, generation, type);
     }
 
     public IMetadataSerializer getMetadataSerializer()
@@ -296,7 +313,7 @@ public class Descriptor
                        && that.generation == this.generation
                        && that.ksname.equals(this.ksname)
                        && that.cfname.equals(this.cfname)
-                       && that.temporary == this.temporary;
+                       && that.type == this.type;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index 0696fb7..f53a7e4 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -25,11 +25,11 @@ import java.nio.ByteBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.cache.RefCountedMemory;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.Memory;
 import org.apache.cassandra.io.util.MemoryOutputStream;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -60,7 +60,7 @@ public class IndexSummary implements Closeable
     private final IPartitioner partitioner;
     private final int summarySize;
     private final int sizeAtFullSampling;
-    private final Memory bytes;
+    private final RefCountedMemory bytes;
 
     /**
      * A value between 1 and BASE_SAMPLING_LEVEL that represents how many of the original
@@ -70,7 +70,7 @@ public class IndexSummary implements Closeable
      */
     private final int samplingLevel;
 
-    public IndexSummary(IPartitioner partitioner, Memory memory, int summarySize, int sizeAtFullSampling,
+    public IndexSummary(IPartitioner partitioner, RefCountedMemory memory, int summarySize, int sizeAtFullSampling,
                         int minIndexInterval, int samplingLevel)
     {
         this.partitioner = partitioner;
@@ -251,7 +251,7 @@ public class IndexSummary implements Closeable
                                                     " the current max index interval (%d)", effectiveIndexInterval, maxIndexInterval));
             }
 
-            Memory memory = Memory.allocate(offheapSize);
+            RefCountedMemory memory = new RefCountedMemory(offheapSize);
             FBUtilities.copy(in, new MemoryOutputStream(memory), offheapSize);
             return new IndexSummary(partitioner, memory, summarySize, fullSamplingSummarySize, minIndexInterval, samplingLevel);
         }
@@ -260,6 +260,12 @@ public class IndexSummary implements Closeable
     @Override
     public void close()
     {
-        bytes.free();
+        bytes.unreference();
+    }
+
+    public IndexSummary readOnlyClone()
+    {
+        bytes.reference();
+        return this;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index d77e887..8580dce 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -17,15 +17,17 @@
  */
 package org.apache.cassandra.io.sstable;
 
-import java.util.*;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cache.RefCountedMemory;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.util.Memory;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
 
@@ -34,7 +36,7 @@ public class IndexSummaryBuilder
     private static final Logger logger = LoggerFactory.getLogger(IndexSummaryBuilder.class);
 
     private final ArrayList<Long> positions;
-    private final ArrayList<byte[]> keys;
+    private final ArrayList<DecoratedKey> keys;
     private final int minIndexInterval;
     private final int samplingLevel;
     private final int[] startPoints;
@@ -69,6 +71,27 @@ public class IndexSummaryBuilder
         keys = new ArrayList<>((int)maxExpectedEntries);
     }
 
+    // finds the last (-offset) decorated key that can be guaranteed to occur fully in the index file before the provided file position
+    public DecoratedKey getMaxReadableKey(long position, int offset)
+    {
+        int i = Collections.binarySearch(positions, position);
+        if (i < 0)
+        {
+            i = -1 - i;
+            if (i == positions.size())
+                i -= 2;
+            else
+                i -= 1;
+        }
+        else
+            i -= 1;
+        i -= offset;
+        // we don't want to return any key if there's only 1 item in the summary, to make sure the sstable range is non-empty
+        if (i <= 0)
+            return null;
+        return keys.get(i);
+    }
+
     public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexPosition)
     {
         if (keysWritten % minIndexInterval == 0)
@@ -86,9 +109,8 @@ public class IndexSummaryBuilder
 
             if (!shouldSkip)
             {
-                byte[] key = ByteBufferUtil.getArray(decoratedKey.key);
-                keys.add(key);
-                offheapSize += key.length;
+                keys.add(decoratedKey);
+                offheapSize += decoratedKey.key.remaining();
                 positions.add(indexPosition);
                 offheapSize += TypeSizes.NATIVE.sizeof(indexPosition);
             }
@@ -102,32 +124,51 @@ public class IndexSummaryBuilder
 
     public IndexSummary build(IPartitioner partitioner)
     {
+        return build(partitioner, null);
+    }
+
+    public IndexSummary build(IPartitioner partitioner, DecoratedKey exclusiveUpperBound)
+    {
         assert keys.size() > 0;
         assert keys.size() == positions.size();
 
+        int length;
+        if (exclusiveUpperBound == null)
+            length = keys.size();
+        else
+            length = Collections.binarySearch(keys, exclusiveUpperBound);
+
+        assert length > 0;
+
+        long offheapSize = this.offheapSize;
+        if (length < keys.size())
+            for (int i = length ; i < keys.size() ; i++)
+                offheapSize -= keys.get(i).key.remaining() + TypeSizes.NATIVE.sizeof(positions.get(i));
+
         // first we write out the position in the *summary* for each key in the summary,
         // then we write out (key, actual index position) pairs
-        Memory memory = Memory.allocate(offheapSize + (keys.size() * 4));
+        RefCountedMemory memory = new RefCountedMemory(offheapSize + (length * 4));
         int idxPosition = 0;
-        int keyPosition = keys.size() * 4;
-        for (int i = 0; i < keys.size(); i++)
+        int keyPosition = length * 4;
+        for (int i = 0; i < length; i++)
         {
             // write the position of the actual entry in the index summary (4 bytes)
             memory.setInt(idxPosition, keyPosition);
             idxPosition += TypeSizes.NATIVE.sizeof(keyPosition);
 
             // write the key
-            byte[] keyBytes = keys.get(i);
-            memory.setBytes(keyPosition, keyBytes, 0, keyBytes.length);
-            keyPosition += keyBytes.length;
+            ByteBuffer keyBytes = keys.get(i).key;
+            memory.setBytes(keyPosition, keyBytes);
+            keyPosition += keyBytes.remaining();
 
             // write the position in the actual index file
             long actualIndexPosition = positions.get(i);
             memory.setLong(keyPosition, actualIndexPosition);
             keyPosition += TypeSizes.NATIVE.sizeof(actualIndexPosition);
         }
+        assert keyPosition == offheapSize + (length * 4);
         int sizeAtFullSampling = (int) Math.ceil(keysWritten / (double) minIndexInterval);
-        return new IndexSummary(partitioner, memory, keys.size(), sizeAtFullSampling, minIndexInterval, samplingLevel);
+        return new IndexSummary(partitioner, memory, length, sizeAtFullSampling, minIndexInterval, samplingLevel);
     }
 
     public static int entriesAtSamplingLevel(int samplingLevel, int maxSummarySize)
@@ -190,7 +231,7 @@ public class IndexSummaryBuilder
 
         // Subtract (removedKeyCount * 4) from the new size to account for fewer entries in the first section, which
         // stores the position of the actual entries in the summary.
-        Memory memory = Memory.allocate(newOffHeapSize - (removedKeyCount * 4));
+        RefCountedMemory memory = new RefCountedMemory(newOffHeapSize - (removedKeyCount * 4));
 
         // Copy old entries to our new Memory.
         int idxPosition = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 5ee9bdb..247343e 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -56,8 +56,6 @@ public abstract class SSTable
 {
     static final Logger logger = LoggerFactory.getLogger(SSTable.class);
 
-    public static final String TEMPFILE_MARKER = "tmp";
-
     public static final int TOMBSTONE_HISTOGRAM_BIN_SIZE = 100;
 
     public final Descriptor descriptor;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index c330c88..7b9d135 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -79,7 +79,7 @@ public class SSTableLoader implements StreamEventHandler
                     return false;
                 Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(dir, name);
                 Descriptor desc = p == null ? null : p.left;
-                if (p == null || !p.right.equals(Component.DATA) || desc.temporary)
+                if (p == null || !p.right.equals(Component.DATA) || desc.type.isTemporary)
                     return false;
 
                 if (!new File(desc.filenameFor(Component.PRIMARY_INDEX)).exists())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 8e359bd..c84eec2 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -193,6 +193,7 @@ public class SSTableReader extends SSTable
     private SSTableReader replacedBy;
     private SSTableReader replaces;
     private SSTableDeletingTask deletingTask;
+    private Runnable runOnClose;
 
     @VisibleForTesting
     public RestorableMeter readMeter;
@@ -340,7 +341,7 @@ public class SSTableReader extends SSTable
         // special implementation of load to use non-pooled SegmentedFile builders
         SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
         SegmentedFile.Builder dbuilder = sstable.compression
-                                       ? new CompressedSegmentedFile.Builder()
+                                       ? new CompressedSegmentedFile.Builder(null)
                                        : new BufferedSegmentedFile.Builder();
         if (!sstable.loadSummary(ibuilder, dbuilder))
             sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL);
@@ -555,7 +556,7 @@ public class SSTableReader extends SSTable
 
         synchronized (replaceLock)
         {
-            boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFile = false;
+            boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = false;
 
             if (replacedBy != null)
             {
@@ -563,7 +564,7 @@ public class SSTableReader extends SSTable
                 closeSummary = replacedBy.indexSummary != indexSummary;
                 closeFiles = replacedBy.dfile != dfile;
                 // if the replacement sstablereader uses a different path, clean up our paths
-                deleteFile = !dfile.path.equals(replacedBy.dfile.path);
+                deleteFiles = !dfile.path.equals(replacedBy.dfile.path);
             }
 
             if (replaces != null)
@@ -571,7 +572,7 @@ public class SSTableReader extends SSTable
                 closeBf &= replaces.bf != bf;
                 closeSummary &= replaces.indexSummary != indexSummary;
                 closeFiles &= replaces.dfile != dfile;
-                deleteFile &= !dfile.path.equals(replaces.dfile.path);
+                deleteFiles &= !dfile.path.equals(replaces.dfile.path);
             }
 
             boolean deleteAll = false;
@@ -597,12 +598,15 @@ public class SSTableReader extends SSTable
                     replacedBy.replaces = replaces;
             }
 
-            scheduleTidy(closeBf, closeSummary, closeFiles, deleteFile, deleteAll);
+            scheduleTidy(closeBf, closeSummary, closeFiles, deleteFiles, deleteAll);
         }
     }
 
     private void scheduleTidy(final boolean closeBf, final boolean closeSummary, final boolean closeFiles, final boolean deleteFiles, final boolean deleteAll)
     {
+        if (references.get() != 0)
+            throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)");
+
         final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
         final OpOrder.Barrier barrier;
         if (cfs != null)
@@ -619,7 +623,6 @@ public class SSTableReader extends SSTable
             {
                 if (barrier != null)
                     barrier.await();
-                assert references.get() == 0;
                 if (closeBf)
                     bf.close();
                 if (closeSummary)
@@ -629,6 +632,8 @@ public class SSTableReader extends SSTable
                     ifile.cleanup();
                     dfile.cleanup();
                 }
+                if (runOnClose != null)
+                    runOnClose.run();
                 if (deleteAll)
                 {
                     /**
@@ -650,6 +655,16 @@ public class SSTableReader extends SSTable
         });
     }
 
+    public boolean equals(Object that)
+    {
+        return that instanceof SSTableReader && ((SSTableReader) that).descriptor.equals(this.descriptor);
+    }
+
+    public int hashCode()
+    {
+        return this.descriptor.hashCode();
+    }
+
     public String getFilename()
     {
         return dfile.path;
@@ -894,6 +909,53 @@ public class SSTableReader extends SSTable
         }
     }
 
+    public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
+    {
+        synchronized (replaceLock)
+        {
+            assert replacedBy == null;
+
+            if (newStart.compareTo(this.first) > 0)
+            {
+                if (newStart.compareTo(this.last) > 0)
+                {
+                    this.runOnClose = new Runnable()
+                    {
+                        public void run()
+                        {
+                            CLibrary.trySkipCache(dfile.path, 0, 0);
+                            CLibrary.trySkipCache(ifile.path, 0, 0);
+                            runOnClose.run();
+                        }
+                    };
+                }
+                else
+                {
+                    final long dataStart = getPosition(newStart, Operator.GE).position;
+                    final long indexStart = getIndexScanPosition(newStart);
+                    this.runOnClose = new Runnable()
+                    {
+                        public void run()
+                        {
+                            CLibrary.trySkipCache(dfile.path, 0, dataStart);
+                            CLibrary.trySkipCache(ifile.path, 0, indexStart);
+                            runOnClose.run();
+                        }
+                    };
+                }
+            }
+
+            if (readMeterSyncFuture != null)
+                readMeterSyncFuture.cancel(false);
+            SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile, dfile, indexSummary.readOnlyClone(), bf, maxDataAge, sstableMetadata);
+            replacement.readMeter = this.readMeter;
+            replacement.first = this.last.compareTo(newStart) > 0 ? newStart : this.last;
+            replacement.last = this.last;
+            setReplacedBy(replacement);
+            return replacement;
+        }
+    }
+
     /**
      * Returns a new SSTableReader with the same properties as this SSTableReader except that a new IndexSummary will
      * be built at the target samplingLevel.  This (original) SSTableReader instance will be marked as replaced, have
@@ -1022,7 +1084,7 @@ public class SSTableReader extends SSTable
         return getIndexScanPositionFromBinarySearchResult(indexSummary.binarySearch(key), indexSummary);
     }
 
-    public static long getIndexScanPositionFromBinarySearchResult(int binarySearchResult, IndexSummary referencedIndexSummary)
+    private static long getIndexScanPositionFromBinarySearchResult(int binarySearchResult, IndexSummary referencedIndexSummary)
     {
         if (binarySearchResult == -1)
             return -1;
@@ -1245,6 +1307,12 @@ public class SSTableReader extends SSTable
         return positions;
     }
 
+    public void invalidateCacheKey(DecoratedKey key)
+    {
+        KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.key);
+        keyCache.remove(cacheKey);
+    }
+
     public void cacheKey(DecoratedKey key, RowIndexEntry info)
     {
         CachingOptions caching = metadata.getCaching();
@@ -1261,29 +1329,6 @@ public class SSTableReader extends SSTable
         keyCache.put(cacheKey, info);
     }
 
-    public void preheat(Map<DecoratedKey, RowIndexEntry> cachedKeys) throws IOException
-    {
-        RandomAccessFile f = new RandomAccessFile(getFilename(), "r");
-
-        try
-        {
-            int fd = CLibrary.getfd(f.getFD());
-
-            for (Map.Entry<DecoratedKey, RowIndexEntry> entry : cachedKeys.entrySet())
-            {
-                cacheKey(entry.getKey(), entry.getValue());
-
-                // add to the cache but don't do actual preheating if we have it disabled in the config
-                if (DatabaseDescriptor.shouldPreheatPageCache() && fd > 0)
-                    CLibrary.preheatPage(fd, entry.getValue().position);
-            }
-        }
-        finally
-        {
-            FileUtils.closeQuietly(f);
-        }
-    }
-
     public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats)
     {
         return getCachedPosition(new KeyCacheKey(metadata.cfId, descriptor, key.key), updateStats);
@@ -1662,6 +1707,20 @@ public class SSTableReader extends SSTable
         return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
     }
 
+    public SSTableReader getCurrentReplacement()
+    {
+        synchronized (replaceLock)
+        {
+            SSTableReader cur = this, next = replacedBy;
+            while (next != null)
+            {
+                cur = next;
+                next = next.replacedBy;
+            }
+            return cur;
+        }
+    }
+
     /**
      * TODO: Move someplace reusable
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
new file mode 100644
index 0000000..2dfefc4
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataTracker;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.utils.CLibrary;
+
+public class SSTableRewriter
+{
+
+    private static final long preemptiveOpenInterval;
+    static
+    {
+        long interval = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20);
+        if (interval < 0)
+            interval = Long.MAX_VALUE;
+        preemptiveOpenInterval = interval;
+    }
+
+    private final DataTracker dataTracker;
+    private final ColumnFamilyStore cfs;
+
+    private final long maxAge;
+    private final Set<SSTableReader> rewriting; // the readers we are rewriting (updated as they are replaced)
+    private final Map<Descriptor, DecoratedKey> originalStarts = new HashMap<>(); // the start key for each reader we are rewriting
+    private final Map<Descriptor, Integer> fileDescriptors = new HashMap<>(); // the file descriptors for each reader descriptor we are rewriting
+
+    private SSTableReader currentlyOpenedEarly; // the reader for the most recent (re)opening of the target file
+    private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at
+
+    private final List<SSTableReader> finished = new ArrayList<>(); // the resultant sstables
+    private final OperationType rewriteType; // the type of rewrite/compaction being performed
+    private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of DataTracker)
+
+    private SSTableWriter writer;
+    private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
+
+    public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, OperationType rewriteType, boolean isOffline)
+    {
+        this.rewriting = rewriting;
+        for (SSTableReader sstable : rewriting)
+        {
+            originalStarts.put(sstable.descriptor, sstable.first);
+            fileDescriptors.put(sstable.descriptor, CLibrary.getfd(sstable.getFilename()));
+        }
+        this.dataTracker = cfs.getDataTracker();
+        this.cfs = cfs;
+        this.maxAge = maxAge;
+        this.rewriteType = rewriteType;
+        this.isOffline = isOffline;
+    }
+
+    public SSTableWriter currentWriter()
+    {
+        return writer;
+    }
+
+    public RowIndexEntry append(AbstractCompactedRow row)
+    {
+        // we do this before appending to ensure we can resetAndTruncate() safely if the append fails
+        maybeReopenEarly(row.key);
+        RowIndexEntry index = writer.append(row);
+        if (!isOffline)
+        {
+            if (index == null)
+            {
+                cfs.invalidateCachedRow(row.key);
+            }
+            else
+            {
+                boolean save = false;
+                for (SSTableReader reader : rewriting)
+                {
+                    if (reader.getCachedPosition(row.key, false) != null)
+                    {
+                        save = true;
+                        break;
+                    }
+                }
+                if (save)
+                    cachedKeys.put(row.key, index);
+            }
+        }
+        return index;
+    }
+
+    // attempts to append the row, if fails resets the writer position
+    public RowIndexEntry tryAppend(AbstractCompactedRow row)
+    {
+        mark();
+        try
+        {
+            return append(row);
+        }
+        catch (Throwable t)
+        {
+            resetAndTruncate();
+            throw t;
+        }
+    }
+
+    private void mark()
+    {
+        writer.mark();
+    }
+
+    private void resetAndTruncate()
+    {
+        writer.resetAndTruncate();
+    }
+
+    private void maybeReopenEarly(DecoratedKey key)
+    {
+        if (writer.getFilePointer() - currentlyOpenedEarlyAt > preemptiveOpenInterval)
+        {
+            if (isOffline)
+            {
+                for (SSTableReader reader : rewriting)
+                {
+                    RowIndexEntry index = reader.getPosition(key, SSTableReader.Operator.GE);
+                    CLibrary.trySkipCache(fileDescriptors.get(reader.descriptor), 0, index == null ? 0 : index.position);
+                }
+            }
+            else
+            {
+                SSTableReader reader = writer.openEarly(maxAge);
+                if (reader != null)
+                {
+                    replaceReader(currentlyOpenedEarly, reader);
+                    currentlyOpenedEarly = reader;
+                    currentlyOpenedEarlyAt = writer.getFilePointer();
+                    moveStarts(reader, Functions.constant(reader.last), false);
+                }
+            }
+        }
+    }
+
+    public void abort()
+    {
+        if (writer == null)
+            return;
+        moveStarts(null, Functions.forMap(originalStarts), true);
+        List<SSTableReader> close = new ArrayList<>(finished);
+        if (currentlyOpenedEarly != null)
+            close.add(currentlyOpenedEarly);
+        // also remove already completed SSTables
+        for (SSTableReader sstable : close)
+            sstable.markObsolete();
+        // releases reference in replaceReaders
+        if (!isOffline)
+        {
+            dataTracker.replaceReaders(close, Collections.<SSTableReader>emptyList());
+            dataTracker.unmarkCompacting(close);
+        }
+        writer.abort();
+    }
+
+    /**
+     * Replace the readers we are rewriting with cloneWithNewStart, reclaiming any page cache that is no longer
+     * needed, and transferring any key cache entries over to the new reader, expiring them from the old. if reset
+     * is true, we are instead restoring the starts of the readers from before the rewriting began
+     *
+     * @param newReader the rewritten reader that replaces them for this region
+     * @param newStarts a function mapping a reader's descriptor to their new start value
+     * @param reset true iff we are restoring earlier starts (increasing the range over which they are valid)
+     */
+    private void moveStarts(SSTableReader newReader, Function<? super Descriptor, DecoratedKey> newStarts, boolean reset)
+    {
+        if (isOffline)
+            return;
+        List<SSTableReader> toReplace = new ArrayList<>();
+        List<SSTableReader> replaceWith = new ArrayList<>();
+        final List<DecoratedKey> invalidateKeys = new ArrayList<>();
+        if (!reset)
+        {
+            invalidateKeys.addAll(cachedKeys.keySet());
+            for (Map.Entry<DecoratedKey, RowIndexEntry> cacheKey : cachedKeys.entrySet())
+                newReader.cacheKey(cacheKey.getKey(), cacheKey.getValue());
+        }
+        cachedKeys = new HashMap<>();
+        for (final SSTableReader sstable : rewriting)
+        {
+            DecoratedKey newStart = newStarts.apply(sstable.descriptor);
+            assert newStart != null;
+            if (sstable.first.compareTo(newStart) < 0 || (reset && newStart != sstable.first))
+            {
+                toReplace.add(sstable);
+                // we call getCurrentReplacement() to support multiple rewriters operating over the same source readers at once.
+                // note: only one such writer should be written to at any moment
+                replaceWith.add(sstable.getCurrentReplacement().cloneWithNewStart(newStart, new Runnable()
+                {
+                    public void run()
+                    {
+                        // this is somewhat racey, in that we could theoretically be closing this old reader
+                        // when an even older reader is still in use, but it's not likely to have any major impact
+                        for (DecoratedKey key : invalidateKeys)
+                            sstable.invalidateCacheKey(key);
+                    }
+                }));
+            }
+        }
+        replaceReaders(toReplace, replaceWith);
+        rewriting.removeAll(toReplace);
+        rewriting.addAll(replaceWith);
+    }
+
+    private void replaceReader(SSTableReader toReplace, SSTableReader replaceWith)
+    {
+        if (isOffline)
+            return;
+        Set<SSTableReader> toReplaceSet;
+        if (toReplace != null)
+        {
+            toReplace.setReplacedBy(replaceWith);
+            toReplaceSet = Collections.singleton(toReplace);
+        }
+        else
+        {
+            dataTracker.markCompacting(Collections.singleton(replaceWith));
+            toReplaceSet = Collections.emptySet();
+        }
+        replaceReaders(toReplaceSet, Collections.singleton(replaceWith));
+    }
+
+    private void replaceReaders(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith)
+    {
+        if (isOffline)
+            return;
+        dataTracker.replaceReaders(toReplace, replaceWith);
+    }
+
+    public void switchWriter(SSTableWriter newWriter)
+    {
+        if (writer == null)
+        {
+            writer = newWriter;
+            return;
+        }
+        // tmp = false because later we want to query it with descriptor from SSTableReader
+        SSTableReader reader = writer.closeAndOpenReader(maxAge);
+        finished.add(reader);
+        replaceReader(currentlyOpenedEarly, reader);
+        moveStarts(reader, Functions.constant(reader.last), false);
+        currentlyOpenedEarly = null;
+        currentlyOpenedEarlyAt = 0;
+        writer = newWriter;
+    }
+
+    public void finish()
+    {
+        finish(-1);
+    }
+    public void finish(long repairedAt)
+    {
+        finish(true, repairedAt);
+    }
+    public void finish(boolean cleanupOldReaders)
+    {
+        finish(cleanupOldReaders, -1);
+    }
+    public void finish(boolean cleanupOldReaders, long repairedAt)
+    {
+        if (writer.getFilePointer() > 0)
+        {
+            SSTableReader reader = repairedAt < 0 ?
+                                    writer.closeAndOpenReader(maxAge) :
+                                    writer.closeAndOpenReader(maxAge, repairedAt);
+            finished.add(reader);
+            replaceReader(currentlyOpenedEarly, reader);
+            moveStarts(reader, Functions.constant(reader.last), false);
+        }
+        else
+        {
+            writer.abort();
+            writer = null;
+        }
+
+        if (!isOffline)
+        {
+            dataTracker.unmarkCompacting(finished);
+            if (cleanupOldReaders)
+                dataTracker.markCompactedSSTablesReplaced(rewriting, finished, rewriteType);
+        }
+        else if (cleanupOldReaders)
+        {
+            for (SSTableReader reader : rewriting)
+            {
+                reader.markObsolete();
+                reader.releaseReference();
+            }
+        }
+    }
+
+    public List<SSTableReader> finished()
+    {
+        return finished;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 4a7729e..1c9c5fd 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -17,26 +17,51 @@
  */
 package org.apache.cassandra.io.sstable;
 
-import java.io.*;
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import com.google.common.collect.Sets;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ArrayBackedSortedColumns;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnIndex;
+import org.apache.cassandra.db.ColumnSerializer;
+import org.apache.cassandra.db.CounterCell;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.OnDiskAtom;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
-import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.SequentialWriter;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FilterFactory;
@@ -110,17 +135,16 @@ public class SSTableWriter extends SSTable
 
         if (compression)
         {
-            dbuilder = SegmentedFile.getCompressedBuilder();
             dataFile = SequentialWriter.open(getFilename(),
                                              descriptor.filenameFor(Component.COMPRESSION_INFO),
-                                             !metadata.populateIoCacheOnFlush(),
                                              metadata.compressionParameters(),
                                              sstableMetadataCollector);
+            dbuilder = SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile);
         }
         else
         {
+            dataFile = SequentialWriter.open(new File(getFilename()), new File(descriptor.filenameFor(Component.CRC)));
             dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
-            dataFile = SequentialWriter.open(new File(getFilename()), !metadata.populateIoCacheOnFlush(), new File(descriptor.filenameFor(Component.CRC)));
         }
 
         this.sstableMetadataCollector = sstableMetadataCollector;
@@ -299,9 +323,16 @@ public class SSTableWriter extends SSTable
      */
     public void abort()
     {
-        assert descriptor.temporary;
-        FileUtils.closeQuietly(iwriter);
-        FileUtils.closeQuietly(dataFile);
+        assert descriptor.type.isTemporary;
+        if (iwriter == null && dataFile == null)
+            return;
+        if (iwriter != null)
+        {
+            FileUtils.closeQuietly(iwriter.indexFile);
+            iwriter.bf.close();
+        }
+        if (dataFile!= null)
+            FileUtils.closeQuietly(dataFile);
 
         Set<Component> components = SSTable.componentsFor(descriptor);
         try
@@ -326,6 +357,54 @@ public class SSTableWriter extends SSTable
         last = lastWrittenKey = getMinimalKey(last);
     }
 
+    public SSTableReader openEarly(long maxDataAge)
+    {
+        StatsMetadata sstableMetadata = (StatsMetadata) sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
+                                                  metadata.getBloomFilterFpChance(),
+                                                  repairedAt).get(MetadataType.STATS);
+
+        // find the max (exclusive) readable key
+        DecoratedKey exclusiveUpperBoundOfReadableIndex = iwriter.getMaxReadableKey(0);
+        if (exclusiveUpperBoundOfReadableIndex == null)
+            return null;
+
+        // create temp links if they don't already exist
+        Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
+        if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
+        {
+            FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX)));
+            FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA)));
+        }
+
+        // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers
+        SegmentedFile ifile = iwriter.builder.openEarly(link.filenameFor(Component.PRIMARY_INDEX));
+        SegmentedFile dfile = dbuilder.openEarly(link.filenameFor(Component.DATA));
+        SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
+                                                           components, metadata,
+                                                           partitioner, ifile,
+                                                           dfile, iwriter.summary.build(partitioner, exclusiveUpperBoundOfReadableIndex),
+                                                           iwriter.bf, maxDataAge, sstableMetadata);
+
+        // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
+        sstable.first = getMinimalKey(first);
+        sstable.last = getMinimalKey(exclusiveUpperBoundOfReadableIndex);
+        DecoratedKey inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(1);
+        if (inclusiveUpperBoundOfReadableData == null)
+            return null;
+        int offset = 2;
+        while (true)
+        {
+            RowIndexEntry indexEntry = sstable.getPosition(inclusiveUpperBoundOfReadableData, SSTableReader.Operator.GT);
+            if (indexEntry != null && indexEntry.position <= dataFile.getLastFlushOffset())
+                break;
+            inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(offset++);
+            if (inclusiveUpperBoundOfReadableData == null)
+                return null;
+        }
+        sstable.last = getMinimalKey(inclusiveUpperBoundOfReadableData);
+        return sstable;
+    }
+
     public SSTableReader closeAndOpenReader()
     {
         return closeAndOpenReader(System.currentTimeMillis());
@@ -395,7 +474,7 @@ public class SSTableWriter extends SSTable
 
     private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components)
     {
-        SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)), true);
+        SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)));
         try
         {
             desc.getMetadataSerializer().serialize(components, out.stream);
@@ -412,7 +491,7 @@ public class SSTableWriter extends SSTable
 
     static Descriptor rename(Descriptor tmpdesc, Set<Component> components)
     {
-        Descriptor newdesc = tmpdesc.asTemporary(false);
+        Descriptor newdesc = tmpdesc.asType(Descriptor.Type.FINAL);
         rename(tmpdesc, newdesc, components);
         return newdesc;
     }
@@ -454,13 +533,19 @@ public class SSTableWriter extends SSTable
 
         IndexWriter(long keyCount)
         {
-            indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)),
-                                              !metadata.populateIoCacheOnFlush());
+            indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
             builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
             summary = new IndexSummaryBuilder(keyCount, metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL);
             bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true);
         }
 
+        // finds the last (-offset) decorated key that can be guaranteed to occur fully in the flushed portion of the index file
+        DecoratedKey getMaxReadableKey(int offset)
+        {
+            long maxIndexLength = indexFile.getLastFlushOffset();
+            return summary.getMaxReadableKey(maxIndexLength, offset);
+        }
+
         public void append(DecoratedKey key, RowIndexEntry indexEntry)
         {
             bf.add(key.key);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 84789a6..7ba2895 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -19,15 +19,25 @@ package org.apache.cassandra.io.sstable.metadata;
 
 import java.io.File;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
 import com.clearspring.analytics.stream.cardinality.ICardinality;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.ColumnNameHelper;
+import org.apache.cassandra.io.sstable.ColumnStats;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.EstimatedHistogram;
 import org.apache.cassandra.utils.MurmurHash;
@@ -246,8 +256,8 @@ public class MetadataCollector
                                                              compressionRatio,
                                                              estimatedTombstoneDropTime,
                                                              sstableLevel,
-                                                             minColumnNames,
-                                                             maxColumnNames,
+                                                             ImmutableList.copyOf(minColumnNames),
+                                                             ImmutableList.copyOf(maxColumnNames),
                                                              hasLegacyCounterShards,
                                                              repairedAt));
         components.put(MetadataType.COMPACTION, new CompactionMetadata(ancestors, cardinality));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
index c0cd96e..7414208 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -146,7 +146,7 @@ public class MetadataSerializer implements IMetadataSerializer
 
     private void rewriteSSTableMetadata(Descriptor descriptor, Map<MetadataType, MetadataComponent> currentComponents) throws IOException
     {
-        Descriptor tmpDescriptor = descriptor.asTemporary(true);
+        Descriptor tmpDescriptor = descriptor.asType(Descriptor.Type.TEMP);
 
         try (DataOutputStreamAndChannel out = new DataOutputStreamAndChannel(new FileOutputStream(tmpDescriptor.filenameFor(Component.STATS))))
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
index 6a23fde..b284f61 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
@@ -38,6 +38,11 @@ public class BufferedPoolingSegmentedFile extends PoolingSegmentedFile
             long length = new File(path).length();
             return new BufferedPoolingSegmentedFile(path, length);
         }
+
+        public SegmentedFile openEarly(String path)
+        {
+            return complete(path);
+        }
     }
 
     protected RandomAccessReader createReader(String path)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
index 790b42b..aa031e3 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
@@ -38,6 +38,11 @@ public class BufferedSegmentedFile extends SegmentedFile
             long length = new File(path).length();
             return new BufferedSegmentedFile(path, length);
         }
+
+        public SegmentedFile openEarly(String path)
+        {
+            return complete(path);
+        }
     }
 
     public FileDataInput getSegment(long position)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
index 3c4c257..98492da 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
@@ -9,10 +9,10 @@ public class ChecksummedSequentialWriter extends SequentialWriter
     private final SequentialWriter crcWriter;
     private final DataIntegrityMetadata.ChecksumWriter crcMetadata;
 
-    public ChecksummedSequentialWriter(File file, int bufferSize, boolean skipIOCache, File crcPath)
+    public ChecksummedSequentialWriter(File file, int bufferSize, File crcPath)
     {
-        super(file, bufferSize, skipIOCache);
-        crcWriter = new SequentialWriter(crcPath, 8 * 1024, true);
+        super(file, bufferSize);
+        crcWriter = new SequentialWriter(crcPath, 8 * 1024);
         crcMetadata = new DataIntegrityMetadata.ChecksumWriter(crcWriter.stream);
         crcMetadata.writeChunkSize(buffer.length);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
index 121bdb2..1803e69 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.io.util;
 
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 
 public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile implements ICompressedFile
@@ -30,8 +31,13 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile impleme
         this.metadata = metadata;
     }
 
-    public static class Builder extends SegmentedFile.Builder
+    public static class Builder extends CompressedSegmentedFile.Builder
     {
+        public Builder(CompressedSequentialWriter writer)
+        {
+            super(writer);
+        }
+
         public void addPotentialBoundary(long boundary)
         {
             // only one segment in a standard-io file
@@ -39,7 +45,12 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile impleme
 
         public SegmentedFile complete(String path)
         {
-            return new CompressedPoolingSegmentedFile(path, CompressionMetadata.create(path));
+            return new CompressedPoolingSegmentedFile(path, metadata(path, false));
+        }
+
+        public SegmentedFile openEarly(String path)
+        {
+            return new CompressedPoolingSegmentedFile(path, metadata(path, true));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index d0ea3fd..4afe0a0 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.io.util;
 
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 
 public class CompressedSegmentedFile extends SegmentedFile implements ICompressedFile
@@ -32,14 +33,35 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse
 
     public static class Builder extends SegmentedFile.Builder
     {
+        protected final CompressedSequentialWriter writer;
+        public Builder(CompressedSequentialWriter writer)
+        {
+            this.writer = writer;
+        }
+
         public void addPotentialBoundary(long boundary)
         {
             // only one segment in a standard-io file
         }
 
+        protected CompressionMetadata metadata(String path, boolean early)
+        {
+            if (writer == null)
+                return CompressionMetadata.create(path);
+            else if (early)
+                return writer.openEarly();
+            else
+                return writer.openAfterClose();
+        }
+
         public SegmentedFile complete(String path)
         {
-            return new CompressedSegmentedFile(path, CompressionMetadata.create(path));
+            return new CompressedSegmentedFile(path, metadata(path, false));
+        }
+
+        public SegmentedFile openEarly(String path)
+        {
+            return new CompressedSegmentedFile(path, metadata(path, true));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 15d890c..875c9d5 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -74,6 +74,11 @@ public class FileUtils
         canCleanDirectBuffers = canClean;
     }
 
+    public static void createHardLink(String from, String to)
+    {
+        createHardLink(new File(from), new File(to));
+    }
+
     public static void createHardLink(File from, File to)
     {
         if (to.exists())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/Memory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/Memory.java b/src/java/org/apache/cassandra/io/util/Memory.java
index 278e0f6..b8a46bc 100644
--- a/src/java/org/apache/cassandra/io/util/Memory.java
+++ b/src/java/org/apache/cassandra/io/util/Memory.java
@@ -21,9 +21,9 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
 import com.sun.jna.Native;
-import com.sun.jna.Pointer;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
 
 /**
  * An off-heap region of memory that must be manually free'd when no longer needed.
@@ -145,6 +145,24 @@ public class Memory
         }
     }
 
+    public void setBytes(long memoryOffset, ByteBuffer buffer)
+    {
+        if (buffer == null)
+            throw new NullPointerException();
+        else if (buffer.remaining() == 0)
+            return;
+        checkPosition(memoryOffset + buffer.remaining());
+        if (buffer.hasArray())
+        {
+            setBytes(memoryOffset, buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
+        }
+        else if (buffer instanceof DirectBuffer)
+        {
+            unsafe.copyMemory(((DirectBuffer) buffer).address() + buffer.position(), peer + memoryOffset, buffer.remaining());
+        }
+        else
+            throw new IllegalStateException();
+    }
     /**
      * Transfers count bytes from buffer to Memory
      *
@@ -263,6 +281,18 @@ public class Memory
         assert offset >= 0 && offset < size : "Illegal offset: " + offset + ", size: " + size;
     }
 
+    public void put(long trgOffset, Memory memory, long srcOffset, long size)
+    {
+        unsafe.copyMemory(memory.peer + srcOffset, peer + trgOffset, size);
+    }
+
+    public Memory copy(long newSize)
+    {
+        Memory copy = Memory.allocate(newSize);
+        copy.put(0, this, 0, Math.min(size(), newSize));
+        return copy;
+    }
+
     public void free()
     {
         assert peer != 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index 39a4160..450553b 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -161,28 +161,36 @@ public class MmappedSegmentedFile extends SegmentedFile
         public SegmentedFile complete(String path)
         {
             long length = new File(path).length();
-            // add a sentinel value == length
-            if (length != boundaries.get(boundaries.size() - 1))
-                boundaries.add(length);
             // create the segments
             return new MmappedSegmentedFile(path, length, createSegments(path));
         }
 
+        public SegmentedFile openEarly(String path)
+        {
+            return complete(path);
+        }
+
         private Segment[] createSegments(String path)
         {
-            int segcount = boundaries.size() - 1;
-            Segment[] segments = new Segment[segcount];
             RandomAccessFile raf;
-
+            long length;
             try
             {
                 raf = new RandomAccessFile(path, "r");
+                length = raf.length();
             }
-            catch (FileNotFoundException e)
+            catch (IOException e)
             {
                 throw new RuntimeException(e);
             }
 
+            // add a sentinel value == length
+            List<Long> boundaries = new ArrayList<>(this.boundaries);
+            if (length != boundaries.get(boundaries.size() - 1))
+                boundaries.add(length);
+            int segcount = boundaries.size() - 1;
+            Segment[] segments = new Segment[segcount];
+
             try
             {
                 for (int i = 0; i < segcount; i++)
@@ -221,7 +229,7 @@ public class MmappedSegmentedFile extends SegmentedFile
             super.deserializeBounds(in);
 
             int size = in.readInt();
-            List<Long> temp = new ArrayList<Long>(size);
+            List<Long> temp = new ArrayList<>(size);
             
             for (int i = 0; i < size; i++)
                 temp.add(in.readLong());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
index 892611c..01f4e31 100644
--- a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
@@ -21,6 +21,7 @@ import org.apache.cassandra.service.FileCacheService;
 
 public abstract class PoolingSegmentedFile extends SegmentedFile
 {
+    final FileCacheService.CacheKey cacheKey = new FileCacheService.CacheKey();
     protected PoolingSegmentedFile(String path, long length)
     {
         super(path, length);
@@ -33,7 +34,7 @@ public abstract class PoolingSegmentedFile extends SegmentedFile
 
     public FileDataInput getSegment(long position)
     {
-        RandomAccessReader reader = FileCacheService.instance.get(path);
+        RandomAccessReader reader = FileCacheService.instance.get(cacheKey);
 
         if (reader == null)
             reader = createReader(path);
@@ -46,11 +47,11 @@ public abstract class PoolingSegmentedFile extends SegmentedFile
 
     public void recycle(RandomAccessReader reader)
     {
-        FileCacheService.instance.put(reader);
+        FileCacheService.instance.put(cacheKey, reader);
     }
 
     public void cleanup()
     {
-        FileCacheService.instance.invalidate(path);
+        FileCacheService.instance.invalidate(cacheKey, path);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index d4da177..be549a6 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -28,6 +28,7 @@ import java.util.NoSuchElementException;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -75,7 +76,12 @@ public abstract class SegmentedFile
 
     public static Builder getCompressedBuilder()
     {
-        return new CompressedPoolingSegmentedFile.Builder();
+        return getCompressedBuilder(null);
+    }
+
+    public static Builder getCompressedBuilder(CompressedSequentialWriter writer)
+    {
+        return new CompressedPoolingSegmentedFile.Builder(writer);
     }
 
     public abstract FileDataInput getSegment(long position);
@@ -111,6 +117,12 @@ public abstract class SegmentedFile
          */
         public abstract SegmentedFile complete(String path);
 
+        /**
+         * Called after all potential boundaries have been added to apply this Builder to a concrete file on disk.
+         * @param path The file on disk.
+         */
+        public abstract SegmentedFile openEarly(String path);
+
         public void serializeBounds(DataOutput out) throws IOException
         {
             out.writeUTF(DatabaseDescriptor.getDiskAccessMode().name());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 1d8b95e..7a7eb63 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -45,7 +45,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
     private final String filePath;
 
     protected byte[] buffer;
-    private final boolean skipIOCache;
     private final int fd;
     private final int directoryFD;
     // directory should be synced only after first file sync, in other words, only once per file
@@ -56,9 +55,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
 
     protected final RandomAccessFile out;
 
-    // used if skip I/O cache was enabled
-    private long ioCacheStartOffset = 0, bytesSinceCacheFlush = 0;
-
     // whether to do trickling fsync() to avoid sudden bursts of dirty buffer flushing by kernel causing read
     // latency spikes
     private boolean trickleFsync;
@@ -66,8 +62,9 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
     private int bytesSinceTrickleFsync = 0;
 
     public final DataOutputPlus stream;
+    protected long lastFlushOffset;
 
-    public SequentialWriter(File file, int bufferSize, boolean skipIOCache)
+    public SequentialWriter(File file, int bufferSize)
     {
         try
         {
@@ -81,7 +78,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
         filePath = file.getAbsolutePath();
 
         buffer = new byte[bufferSize];
-        this.skipIOCache = skipIOCache;
         this.trickleFsync = DatabaseDescriptor.getTrickleFsync();
         this.trickleFsyncByteInterval = DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024;
 
@@ -100,31 +96,25 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
 
     public static SequentialWriter open(File file)
     {
-        return open(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, false);
-    }
-
-    public static SequentialWriter open(File file, boolean skipIOCache)
-    {
-        return open(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, skipIOCache);
+        return open(file, RandomAccessReader.DEFAULT_BUFFER_SIZE);
     }
 
-    public static SequentialWriter open(File file, int bufferSize, boolean skipIOCache)
+    public static SequentialWriter open(File file, int bufferSize)
     {
-        return new SequentialWriter(file, bufferSize, skipIOCache);
+        return new SequentialWriter(file, bufferSize);
     }
 
-    public static ChecksummedSequentialWriter open(File file, boolean skipIOCache, File crcPath)
+    public static ChecksummedSequentialWriter open(File file, File crcPath)
     {
-        return new ChecksummedSequentialWriter(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, skipIOCache, crcPath);
+        return new ChecksummedSequentialWriter(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, crcPath);
     }
 
     public static CompressedSequentialWriter open(String dataFilePath,
                                                   String offsetsPath,
-                                                  boolean skipIOCache,
                                                   CompressionParameters parameters,
                                                   MetadataCollector sstableMetadataCollector)
     {
-        return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, skipIOCache, parameters, sstableMetadataCollector);
+        return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, parameters, sstableMetadataCollector);
     }
 
     public void write(int value) throws ClosedChannelException
@@ -302,23 +292,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
                 }
             }
 
-            if (skipIOCache)
-            {
-                // we don't know when the data reaches disk since we aren't
-                // calling flush
-                // so we continue to clear pages we don't need from the first
-                // offset we see
-                // periodically we update this starting offset
-                bytesSinceCacheFlush += validBufferBytes;
-
-                if (bytesSinceCacheFlush >= RandomAccessReader.CACHE_FLUSH_INTERVAL_IN_BYTES)
-                {
-                    CLibrary.trySkipCache(this.fd, ioCacheStartOffset, 0);
-                    ioCacheStartOffset = bufferOffset;
-                    bytesSinceCacheFlush = 0;
-                }
-            }
-
             // Remember that we wrote, so we don't write it again on next flush().
             resetBuffer();
 
@@ -335,6 +308,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
         try
         {
             out.write(buffer, 0, validBufferBytes);
+            lastFlushOffset += validBufferBytes;
         }
         catch (IOException e)
         {
@@ -431,6 +405,11 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
         resetBuffer();
     }
 
+    public long getLastFlushOffset()
+    {
+        return lastFlushOffset;
+    }
+
     public void truncate(long toSize)
     {
         try
@@ -458,9 +437,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
 
         buffer = null;
 
-        if (skipIOCache && bytesSinceCacheFlush > 0)
-            CLibrary.trySkipCache(fd, 0, 0);
-
         try
         {
             out.close();


[3/3] git commit: Preemptive open of compaction results

Posted by ma...@apache.org.
Preemptive open of compaction results

Patch by benedict; reviewed by marcuse for CASSANDRA-6916


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4e95953f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4e95953f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4e95953f

Branch: refs/heads/cassandra-2.1
Commit: 4e95953f29d89a441dfe06d3f0393ed7dd8586df
Parents: b3a225e
Author: belliottsmith <gi...@sub.laerad.com>
Authored: Wed Apr 23 15:16:09 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Apr 23 16:25:02 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |  15 +-
 .../apache/cassandra/cache/AutoSavingCache.java |   2 +-
 .../cassandra/cache/RefCountedMemory.java       |  15 +-
 .../org/apache/cassandra/config/CFMetaData.java |  78 +++--
 .../org/apache/cassandra/config/Config.java     |   4 +-
 .../cassandra/config/DatabaseDescriptor.java    |  15 +-
 .../cassandra/cql/AlterTableStatement.java      |  18 +-
 .../cql/CreateColumnFamilyStatement.java        |   3 +-
 .../cassandra/cql3/statements/CFPropDefs.java   |   7 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  13 +-
 .../org/apache/cassandra/db/DataTracker.java    |  15 +-
 .../org/apache/cassandra/db/Directories.java    |   2 +-
 .../compaction/AbstractCompactionStrategy.java  |   2 +-
 .../db/compaction/AbstractCompactionTask.java   |   4 +-
 .../db/compaction/CompactionController.java     |   2 +-
 .../db/compaction/CompactionManager.java        |  91 +++--
 .../cassandra/db/compaction/CompactionTask.java | 143 +++-----
 .../db/compaction/LeveledCompactionTask.java    |   2 +-
 .../db/compaction/SSTableSplitter.java          |   7 +-
 .../cassandra/db/compaction/Scrubber.java       |  57 ++--
 .../SizeTieredCompactionStrategy.java           |   6 +-
 .../cassandra/db/compaction/Upgrader.java       |  43 +--
 .../io/compress/CompressedSequentialWriter.java |  26 +-
 .../io/compress/CompressionMetadata.java        | 181 +++++-----
 .../io/sstable/AbstractSSTableSimpleWriter.java |   2 +-
 .../apache/cassandra/io/sstable/Descriptor.java |  49 ++-
 .../cassandra/io/sstable/IndexSummary.java      |  16 +-
 .../io/sstable/IndexSummaryBuilder.java         |  75 ++++-
 .../apache/cassandra/io/sstable/SSTable.java    |   2 -
 .../cassandra/io/sstable/SSTableLoader.java     |   2 +-
 .../cassandra/io/sstable/SSTableReader.java     | 119 +++++--
 .../cassandra/io/sstable/SSTableRewriter.java   | 330 +++++++++++++++++++
 .../cassandra/io/sstable/SSTableWriter.java     | 117 ++++++-
 .../io/sstable/metadata/MetadataCollector.java  |  18 +-
 .../io/sstable/metadata/MetadataSerializer.java |   2 +-
 .../io/util/BufferedPoolingSegmentedFile.java   |   5 +
 .../io/util/BufferedSegmentedFile.java          |   5 +
 .../io/util/ChecksummedSequentialWriter.java    |   6 +-
 .../io/util/CompressedPoolingSegmentedFile.java |  15 +-
 .../io/util/CompressedSegmentedFile.java        |  24 +-
 .../org/apache/cassandra/io/util/FileUtils.java |   5 +
 .../org/apache/cassandra/io/util/Memory.java    |  32 +-
 .../cassandra/io/util/MmappedSegmentedFile.java |  24 +-
 .../cassandra/io/util/PoolingSegmentedFile.java |   7 +-
 .../apache/cassandra/io/util/SegmentedFile.java |  14 +-
 .../cassandra/io/util/SequentialWriter.java     |  52 +--
 .../cassandra/service/FileCacheService.java     |  94 ++++--
 .../cassandra/streaming/StreamLockfile.java     |  13 +-
 .../cassandra/tools/StandaloneScrubber.java     |  11 -
 .../cassandra/tools/StandaloneSplitter.java     |   4 -
 .../cassandra/tools/StandaloneUpgrader.java     |   3 -
 .../org/apache/cassandra/utils/CLibrary.java    |  59 ++--
 .../cassandra/utils/obs/OffHeapBitSet.java      |   5 +-
 .../db/compaction/LongCompactionsTest.java      |   2 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     |  64 +++-
 .../apache/cassandra/db/DirectoriesTest.java    |  23 +-
 .../org/apache/cassandra/db/KeyCacheTest.java   |  21 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |  23 +-
 .../CompressedRandomAccessReaderTest.java       |  19 +-
 .../cassandra/io/sstable/LegacySSTableTest.java |   2 +-
 .../cassandra/io/sstable/SSTableUtils.java      |   2 +-
 .../metadata/MetadataSerializerTest.java        |   5 +-
 .../cassandra/io/util/DataOutputTest.java       |   2 +-
 .../compress/CompressedInputStreamTest.java     |   2 +-
 65 files changed, 1345 insertions(+), 682 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 211e55c..d32a107 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -51,6 +51,7 @@
  * Require nodetool rebuild_index to specify index names (CASSANDRA-7038)
  * fix cassandra stress errors on reads with native protocol (CASSANDRA-7033)
  * Use OpOrder to guard sstable references for reads (CASSANDRA-6919)
+ * Preemptive opening of compaction result (CASSANDRA-6916)
 Merged from 2.0:
  * Use LOCAL_QUORUM for data reads at LOCAL_SERIAL (CASSANDRA-6939)
  * Log a warning for large batches (CASSANDRA-6487)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 8ed796b..2176bf9 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -511,10 +511,11 @@ in_memory_compaction_limit_in_mb: 64
 # of compaction, including validation compaction.
 compaction_throughput_mb_per_sec: 16
 
-# Track cached row keys during compaction, and re-cache their new
-# positions in the compacted sstable.  Disable if you use really large
-# key caches.
-compaction_preheat_key_cache: true
+# When compacting, the replacement sstable(s) can be opened before they
+# are completely written, and used in place of the prior sstables for
+# any range that has been written. This helps to smoothly transfer reads 
+# between the sstables, reducing page cache churn and keeping hot rows hot
+sstable_preemptive_open_interval_in_mb: 50
 
 # Throttles all outbound streaming file transfers on this node to the
 # given total throughput in Mbps. This is necessary because Cassandra does
@@ -730,9 +731,3 @@ internode_compression: all
 # reducing overhead from the TCP protocol itself, at the cost of increasing
 # latency if you block for cross-datacenter responses.
 inter_dc_tcp_nodelay: false
-
-# Enable or disable kernel page cache preheating from contents of the key cache after compaction.
-# When enabled it would preheat only first "page" (4KB) of each row to optimize
-# for sequential access. Note: This could be harmful for fat rows, see CASSANDRA-4937
-# for further details on that topic.
-preheat_kernel_page_cache: false

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 7b9ae95..db79a15 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -258,7 +258,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
         {
             File path = getCachePath(pathInfo.keyspace, pathInfo.columnFamily, pathInfo.cfId, CURRENT_VERSION);
             File tmpFile = FileUtils.createTempFile(path.getName(), null, path.getParentFile());
-            return SequentialWriter.open(tmpFile, true);
+            return SequentialWriter.open(tmpFile);
         }
 
         private void deleteOldCacheFiles()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/cache/RefCountedMemory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/RefCountedMemory.java b/src/java/org/apache/cassandra/cache/RefCountedMemory.java
index 76d9b00..e5c543e 100644
--- a/src/java/org/apache/cassandra/cache/RefCountedMemory.java
+++ b/src/java/org/apache/cassandra/cache/RefCountedMemory.java
@@ -51,6 +51,19 @@ public class RefCountedMemory extends Memory
     public void unreference()
     {
         if (UPDATER.decrementAndGet(this) == 0)
-            free();
+            super.free();
     }
+
+    public RefCountedMemory copy(long newSize)
+    {
+        RefCountedMemory copy = new RefCountedMemory(newSize);
+        copy.put(0, this, 0, Math.min(size(), newSize));
+        return copy;
+    }
+
+    public void free()
+    {
+        throw new AssertionError();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 72a0fc5..97fc241 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -21,7 +21,20 @@ import java.io.DataInput;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
@@ -37,16 +50,44 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cache.CachingOptions;
-import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.statements.CFStatement;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.AtomDeserializer;
+import org.apache.cassandra.db.CFRowAdder;
+import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.db.ColumnSerializer;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.OnDiskAtom;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.SuperColumns;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
 import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
 import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
-import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.composites.CType;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.CellNames;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.CompoundCType;
+import org.apache.cassandra.db.composites.SimpleCType;
 import org.apache.cassandra.db.index.SecondaryIndex;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.RequestValidationException;
@@ -55,14 +96,16 @@ import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.compress.LZ4Compressor;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.thrift.CqlRow;
 import org.apache.cassandra.thrift.CqlResult;
+import org.apache.cassandra.thrift.CqlRow;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
-import static org.apache.cassandra.utils.FBUtilities.*;
+import static org.apache.cassandra.utils.FBUtilities.fromJsonList;
+import static org.apache.cassandra.utils.FBUtilities.fromJsonMap;
+import static org.apache.cassandra.utils.FBUtilities.json;
 
 /**
  * This class can be tricky to modify. Please read http://wiki.apache.org/cassandra/ConfigurationNotes for how to do so safely.
@@ -82,7 +125,6 @@ public final class CFMetaData
     public final static SpeculativeRetry DEFAULT_SPECULATIVE_RETRY = new SpeculativeRetry(SpeculativeRetry.RetryType.PERCENTILE, 0.99);
     public final static int DEFAULT_MIN_INDEX_INTERVAL = 128;
     public final static int DEFAULT_MAX_INDEX_INTERVAL = 2048;
-    public final static boolean DEFAULT_POPULATE_IO_CACHE_ON_FLUSH = false;
 
     // Note that this is the default only for user created tables
     public final static String DEFAULT_COMPRESSOR = LZ4Compressor.class.getCanonicalName();
@@ -397,7 +439,6 @@ public final class CFMetaData
     private volatile int memtableFlushPeriod = 0;
     private volatile int defaultTimeToLive = DEFAULT_DEFAULT_TIME_TO_LIVE;
     private volatile SpeculativeRetry speculativeRetry = DEFAULT_SPECULATIVE_RETRY;
-    private volatile boolean populateIoCacheOnFlush = DEFAULT_POPULATE_IO_CACHE_ON_FLUSH;
     private volatile Map<ColumnIdentifier, Long> droppedColumns = new HashMap<>();
     private volatile Map<String, TriggerDefinition> triggers = new HashMap<>();
     private volatile boolean isPurged = false;
@@ -443,7 +484,6 @@ public final class CFMetaData
     public CFMetaData memtableFlushPeriod(int prop) {memtableFlushPeriod = prop; return this;}
     public CFMetaData defaultTimeToLive(int prop) {defaultTimeToLive = prop; return this;}
     public CFMetaData speculativeRetry(SpeculativeRetry prop) {speculativeRetry = prop; return this;}
-    public CFMetaData populateIoCacheOnFlush(boolean prop) {populateIoCacheOnFlush = prop; return this;}
     public CFMetaData droppedColumns(Map<ColumnIdentifier, Long> cols) {droppedColumns = cols; return this;}
     public CFMetaData triggers(Map<String, TriggerDefinition> prop) {triggers = prop; return this;}
 
@@ -621,7 +661,6 @@ public final class CFMetaData
                       .maxIndexInterval(oldCFMD.maxIndexInterval)
                       .speculativeRetry(oldCFMD.speculativeRetry)
                       .memtableFlushPeriod(oldCFMD.memtableFlushPeriod)
-                      .populateIoCacheOnFlush(oldCFMD.populateIoCacheOnFlush)
                       .droppedColumns(new HashMap<>(oldCFMD.droppedColumns))
                       .triggers(new HashMap<>(oldCFMD.triggers))
                       .rebuild();
@@ -673,11 +712,6 @@ public final class CFMetaData
         return ReadRepairDecision.NONE;
     }
 
-    public boolean populateIoCacheOnFlush()
-    {
-        return populateIoCacheOnFlush;
-    }
-
     public int getGcGraceSeconds()
     {
         return gcGraceSeconds;
@@ -880,7 +914,6 @@ public final class CFMetaData
             && Objects.equal(minIndexInterval, other.minIndexInterval)
             && Objects.equal(maxIndexInterval, other.maxIndexInterval)
             && Objects.equal(speculativeRetry, other.speculativeRetry)
-            && Objects.equal(populateIoCacheOnFlush, other.populateIoCacheOnFlush)
             && Objects.equal(droppedColumns, other.droppedColumns)
             && Objects.equal(triggers, other.triggers);
     }
@@ -913,7 +946,6 @@ public final class CFMetaData
             .append(minIndexInterval)
             .append(maxIndexInterval)
             .append(speculativeRetry)
-            .append(populateIoCacheOnFlush)
             .append(droppedColumns)
             .append(triggers)
             .toHashCode();
@@ -930,8 +962,6 @@ public final class CFMetaData
     {
         if (!cf_def.isSetComment())
             cf_def.setComment("");
-        if (!cf_def.isSetPopulate_io_cache_on_flush())
-            cf_def.setPopulate_io_cache_on_flush(CFMetaData.DEFAULT_POPULATE_IO_CACHE_ON_FLUSH);
         if (!cf_def.isSetMin_compaction_threshold())
             cf_def.setMin_compaction_threshold(CFMetaData.DEFAULT_MIN_COMPACTION_THRESHOLD);
         if (!cf_def.isSetMax_compaction_threshold())
@@ -1023,7 +1053,6 @@ public final class CFMetaData
             if (cf_def.isSetSpeculative_retry())
                 newCFMD.speculativeRetry(SpeculativeRetry.fromString(cf_def.speculative_retry));
             if (cf_def.isSetPopulate_io_cache_on_flush())
-                newCFMD.populateIoCacheOnFlush(cf_def.populate_io_cache_on_flush);
             if (cf_def.isSetTriggers())
                 newCFMD.triggers(TriggerDefinition.fromThrift(cf_def.triggers));
 
@@ -1125,7 +1154,6 @@ public final class CFMetaData
         memtableFlushPeriod = cfm.memtableFlushPeriod;
         defaultTimeToLive = cfm.defaultTimeToLive;
         speculativeRetry = cfm.speculativeRetry;
-        populateIoCacheOnFlush = cfm.populateIoCacheOnFlush;
 
         if (!cfm.droppedColumns.isEmpty())
             droppedColumns = cfm.droppedColumns;
@@ -1252,7 +1280,6 @@ public final class CFMetaData
         def.setComment(Strings.nullToEmpty(comment));
         def.setRead_repair_chance(readRepairChance);
         def.setDclocal_read_repair_chance(dcLocalReadRepairChance);
-        def.setPopulate_io_cache_on_flush(populateIoCacheOnFlush);
         def.setGc_grace_seconds(gcGraceSeconds);
         def.setDefault_validation_class(defaultValidator == null ? null : defaultValidator.toString());
         def.setKey_validation_class(keyValidator.toString());
@@ -1642,7 +1669,6 @@ public final class CFMetaData
         adder.add("comment", comment);
         adder.add("read_repair_chance", readRepairChance);
         adder.add("local_read_repair_chance", dcLocalReadRepairChance);
-        adder.add("populate_io_cache_on_flush", populateIoCacheOnFlush);
         adder.add("gc_grace_seconds", gcGraceSeconds);
         adder.add("default_validator", defaultValidator.toString());
         adder.add("key_validator", keyValidator.toString());
@@ -1730,9 +1756,6 @@ public final class CFMetaData
             if (result.has("max_index_interval"))
                 cfm.maxIndexInterval(result.getInt("max_index_interval"));
 
-            if (result.has("populate_io_cache_on_flush"))
-                cfm.populateIoCacheOnFlush(result.getBoolean("populate_io_cache_on_flush"));
-
             /*
              * The info previously hold by key_aliases, column_aliases and value_alias is now stored in columnMetadata (because 1) this
              * make more sense and 2) this allow to store indexing information).
@@ -2198,7 +2221,6 @@ public final class CFMetaData
             .append("minIndexInterval", minIndexInterval)
             .append("maxIndexInterval", maxIndexInterval)
             .append("speculativeRetry", speculativeRetry)
-            .append("populateIoCacheOnFlush", populateIoCacheOnFlush)
             .append("droppedColumns", droppedColumns)
             .append("triggers", triggers)
             .toString();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 3cf8ff8..7ab7a8c 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -177,7 +177,7 @@ public class Config
     public int hinted_handoff_throttle_in_kb = 1024;
     public int batchlog_replay_throttle_in_kb = 1024;
     public int max_hints_delivery_threads = 1;
-    public boolean compaction_preheat_key_cache = true;
+    public int sstable_preemptive_open_interval_in_mb = 50;
 
     public volatile boolean incremental_backups = false;
     public boolean trickle_fsync = false;
@@ -199,8 +199,6 @@ public class Config
 
     private static boolean isClientMode = false;
 
-    public boolean preheat_kernel_page_cache = false;
-
     public Integer file_cache_size_in_mb;
 
     public boolean inter_dc_tcp_nodelay = true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index ef2c4fc..4b0043c 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1331,11 +1331,6 @@ public class DatabaseDescriptor
         return conf.max_hints_delivery_threads;
     }
 
-    public static boolean getPreheatKeyCache()
-    {
-        return conf.compaction_preheat_key_cache;
-    }
-
     public static boolean isIncrementalBackupsEnabled()
     {
         return conf.incremental_backups;
@@ -1356,6 +1351,11 @@ public class DatabaseDescriptor
         return conf.commitlog_total_space_in_mb;
     }
 
+    public static int getSSTablePreempiveOpenIntervalInMB()
+    {
+        return conf.sstable_preemptive_open_interval_in_mb;
+    }
+
     public static boolean getTrickleFsync()
     {
         return conf.trickle_fsync;
@@ -1476,11 +1476,6 @@ public class DatabaseDescriptor
         return conf.inter_dc_tcp_nodelay;
     }
 
-    public static boolean shouldPreheatPageCache()
-    {
-        return conf.preheat_kernel_page_cache;
-    }
-
     public static Pool getMemtableAllocatorPool()
     {
         long heapLimit = ((long) conf.memtable_heap_space_in_mb) << 20;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/cql/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/AlterTableStatement.java b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
index 7af65a1..5bc7011 100644
--- a/src/java/org/apache/cassandra/cql/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
@@ -17,16 +17,21 @@
  */
 package org.apache.cassandra.cql;
 
-import org.apache.cassandra.cache.CachingOptions;
-import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.marshal.TypeParser;
-import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.io.compress.CompressionParameters;
-
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.cassandra.cache.CachingOptions;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.io.compress.CompressionParameters;
+
 public class AlterTableStatement
 {
     public static enum OperationType
@@ -183,7 +188,6 @@ public class AlterTableStatement
         cfm.caching(CachingOptions.fromString(cfProps.getPropertyString(CFPropDefs.KW_CACHING, cfm.getCaching().toString())));
         cfm.defaultTimeToLive(cfProps.getPropertyInt(CFPropDefs.KW_DEFAULT_TIME_TO_LIVE, cfm.getDefaultTimeToLive()));
         cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(cfProps.getPropertyString(CFPropDefs.KW_SPECULATIVE_RETRY, cfm.getSpeculativeRetry().toString())));
-        cfm.populateIoCacheOnFlush(cfProps.getPropertyBoolean(CFPropDefs.KW_POPULATE_IO_CACHE_ON_FLUSH, cfm.populateIoCacheOnFlush()));
         cfm.bloomFilterFpChance(cfProps.getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, cfm.getBloomFilterFpChance()));
         cfm.memtableFlushPeriod(cfProps.getPropertyInt(CFPropDefs.KW_MEMTABLE_FLUSH_PERIOD, cfm.getMemtableFlushPeriod()));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
index b483451..4cb9eba 100644
--- a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
+++ b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
@@ -201,8 +201,7 @@ public class CreateColumnFamilyStatement
                    .speculativeRetry(CFMetaData.SpeculativeRetry.fromString(getPropertyString(CFPropDefs.KW_SPECULATIVE_RETRY, CFMetaData.DEFAULT_SPECULATIVE_RETRY.toString())))
                    .bloomFilterFpChance(getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, null))
                    .memtableFlushPeriod(getPropertyInt(CFPropDefs.KW_MEMTABLE_FLUSH_PERIOD, 0))
-                   .defaultTimeToLive(getPropertyInt(CFPropDefs.KW_DEFAULT_TIME_TO_LIVE, CFMetaData.DEFAULT_DEFAULT_TIME_TO_LIVE))
-                   .populateIoCacheOnFlush(getPropertyBoolean(CFPropDefs.KW_POPULATE_IO_CACHE_ON_FLUSH, CFMetaData.DEFAULT_POPULATE_IO_CACHE_ON_FLUSH));
+                   .defaultTimeToLive(getPropertyInt(CFPropDefs.KW_DEFAULT_TIME_TO_LIVE, CFMetaData.DEFAULT_DEFAULT_TIME_TO_LIVE));
 
             // CQL2 can have null keyAliases
             if (keyAlias != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
index 95fb750..8df0106 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java
@@ -17,7 +17,10 @@
  */
 package org.apache.cassandra.cql3.statements;
 
-import java.util.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.cassandra.cache.CachingOptions;
 import org.apache.cassandra.config.CFMetaData;
@@ -26,7 +29,6 @@ import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.io.compress.CompressionParameters;
-import org.apache.cassandra.service.CacheService;
 
 public class CFPropDefs extends PropertyDefinitions
 {
@@ -183,7 +185,6 @@ public class CFPropDefs extends PropertyDefinitions
         cfm.defaultTimeToLive(getInt(KW_DEFAULT_TIME_TO_LIVE, cfm.getDefaultTimeToLive()));
         cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(getString(KW_SPECULATIVE_RETRY, cfm.getSpeculativeRetry().toString())));
         cfm.memtableFlushPeriod(getInt(KW_MEMTABLE_FLUSH_PERIOD, cfm.getMemtableFlushPeriod()));
-        cfm.populateIoCacheOnFlush(getBoolean(KW_POPULATE_IO_CACHE_ON_FLUSH, cfm.populateIoCacheOnFlush()));
         cfm.minIndexInterval(getInt(KW_MIN_INDEX_INTERVAL, cfm.getMinIndexInterval()));
         cfm.maxIndexInterval(getInt(KW_MAX_INDEX_INTERVAL, cfm.getMaxIndexInterval()));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index ea49250..49cb47d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -487,7 +487,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             Descriptor desc = sstableFiles.getKey();
             Set<Component> components = sstableFiles.getValue();
 
-            if (desc.temporary)
+            if (desc.type.isTemporary)
             {
                 SSTable.delete(desc, components);
                 continue;
@@ -680,7 +680,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
             if (currentDescriptors.contains(descriptor))
                 continue; // old (initialized) SSTable found, skipping
-            if (descriptor.temporary) // in the process of being written
+            if (descriptor.type.isTemporary) // in the process of being written
                 continue;
 
             if (!descriptor.isCompatible())
@@ -710,7 +710,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                                                descriptor.ksname,
                                                descriptor.cfname,
                                                fileIndexGenerator.incrementAndGet(),
-                                               false);
+                                               Descriptor.Type.FINAL);
             }
             while (new File(newDescriptor.filenameFor(Component.DATA)).exists());
 
@@ -789,7 +789,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                                          keyspace.getName(),
                                          name,
                                          fileIndexGenerator.incrementAndGet(),
-                                         true);
+                                         Descriptor.Type.TEMP);
         return desc.filenameFor(Component.DATA);
     }
 
@@ -1362,11 +1362,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         data.markObsolete(sstables, compactionType);
     }
 
-    public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Collection<SSTableReader> replacements, OperationType compactionType)
-    {
-        data.replaceCompactedSSTables(sstables, replacements, compactionType);
-    }
-
     void replaceFlushed(Memtable memtable, SSTableReader sstable)
     {
         compactionStrategy.replaceFlushed(memtable, sstable);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 9c8f9a0..e574143 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -195,11 +195,12 @@ public class DataTracker
         while (true)
         {
             View currentView = view.get();
-            Set<SSTableReader> inactive = Sets.difference(ImmutableSet.copyOf(sstables), currentView.compacting);
-            if (inactive.size() < Iterables.size(sstables))
+            Set<SSTableReader> set = ImmutableSet.copyOf(sstables);
+            Set<SSTableReader> inactive = Sets.difference(set, currentView.compacting);
+            if (inactive.size() < set.size())
                 return false;
 
-            View newView = currentView.markCompacting(inactive);
+            View newView = currentView.markCompacting(set);
             if (view.compareAndSet(currentView, newView))
                 return true;
         }
@@ -245,10 +246,12 @@ public class DataTracker
         notifySSTablesChanged(sstables, Collections.<SSTableReader>emptyList(), compactionType);
     }
 
-    public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Collection<SSTableReader> replacements, OperationType compactionType)
+    // note that this DOES NOT insert the replacement sstables, it only removes the old sstables and notifies any listeners
+    // that they have been replaced by the provided sstables, which must have been performed by an earlier replaceReaders() call
+    public void markCompactedSSTablesReplaced(Collection<SSTableReader> sstables, Collection<SSTableReader> allReplacements, OperationType compactionType)
     {
-        replace(sstables, replacements);
-        notifySSTablesChanged(sstables, replacements, compactionType);
+        replace(sstables, Collections.<SSTableReader>emptyList());
+        notifySSTablesChanged(sstables, allReplacements, compactionType);
     }
 
     public void addInitialSSTables(Collection<SSTableReader> sstables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index a6b7efa..1350be2 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -501,7 +501,7 @@ public class Directories
                     if (pair == null)
                         return false;
 
-                    if (skipTemporary && pair.left.temporary)
+                    if (skipTemporary && pair.left.type.isTemporary)
                         return false;
 
                     Set<Component> previous = components.get(pair.left);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index b7b3782..0691819 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -169,7 +169,7 @@ public abstract class AbstractCompactionStrategy
 
     public AbstractCompactionTask getCompactionTask(Collection<SSTableReader> sstables, final int gcBefore, long maxSSTableBytes)
     {
-        return new CompactionTask(cfs, sstables, gcBefore);
+        return new CompactionTask(cfs, sstables, gcBefore, false);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
index f1fcbd1..5f9c7ae 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
@@ -28,7 +28,7 @@ import org.apache.cassandra.io.util.DiskAwareRunnable;
 public abstract class AbstractCompactionTask extends DiskAwareRunnable
 {
     protected final ColumnFamilyStore cfs;
-    protected Iterable<SSTableReader> sstables;
+    protected Set<SSTableReader> sstables;
     protected boolean isUserDefined;
     protected OperationType compactionType;
 
@@ -36,7 +36,7 @@ public abstract class AbstractCompactionTask extends DiskAwareRunnable
      * @param cfs
      * @param sstables must be marked compacting
      */
-    public AbstractCompactionTask(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables)
+    public AbstractCompactionTask(ColumnFamilyStore cfs, Set<SSTableReader> sstables)
     {
         this.cfs = cfs;
         this.sstables = sstables;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 1fc1eda..3fe5c26 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -127,7 +127,7 @@ public class CompactionController implements AutoCloseable
                         candidate, candidate.getSSTableMetadata().maxLocalDeletionTime, gcBefore);
             }
         }
-        return new HashSet<SSTableReader>(candidates);
+        return new HashSet<>(candidates);
     }
 
     public String getKeyspace()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index b1f0c2a..8343e86 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -45,6 +45,7 @@ import javax.management.openmbean.TabularData;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
@@ -74,6 +75,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableRewriter;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.FileUtils;
@@ -424,12 +426,8 @@ public class CompactionManager implements CompactionManagerMBean
         }
         cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
         cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses));
-        Collection<SSTableReader> antiCompactedSSTables = null;
         if (!sstables.isEmpty())
-            antiCompactedSSTables = doAntiCompaction(cfs, ranges, sstables, repairedAt);
-        // verify that there are tables to be swapped, otherwise CFS#replaceCompactedSSTables will hang.
-        if (antiCompactedSSTables != null && antiCompactedSSTables.size() > 0)
-            cfs.replaceCompactedSSTables(sstables, antiCompactedSSTables, OperationType.ANTICOMPACTION);
+            doAntiCompaction(cfs, ranges, sstables, repairedAt);
         SSTableReader.releaseReferences(sstables);
         cfs.getDataTracker().unmarkCompacting(sstables);
         logger.info(String.format("Completed anticompaction successfully"));
@@ -585,7 +583,7 @@ public class CompactionManager implements CompactionManagerMBean
 
     private void scrubOne(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted) throws IOException
     {
-        Scrubber scrubber = new Scrubber(cfs, sstable, skipCorrupted);
+        Scrubber scrubber = new Scrubber(cfs, sstable, skipCorrupted, false);
 
         CompactionInfo.Holder scrubInfo = scrubber.getScrubInfo();
         metrics.beginCompaction(scrubInfo);
@@ -598,14 +596,6 @@ public class CompactionManager implements CompactionManagerMBean
             scrubber.close();
             metrics.finishCompaction(scrubInfo);
         }
-
-        if (scrubber.getNewInOrderSSTable() != null)
-            cfs.addSSTable(scrubber.getNewInOrderSSTable());
-
-        if (scrubber.getNewSSTable() == null)
-            cfs.markObsolete(Collections.singletonList(sstable), OperationType.SCRUB);
-        else
-            cfs.replaceCompactedSSTables(Collections.singletonList(sstable), Collections.singletonList(scrubber.getNewSSTable()), OperationType.SCRUB);
     }
 
     /**
@@ -683,9 +673,10 @@ public class CompactionManager implements CompactionManagerMBean
 
         for (SSTableReader sstable : sstables)
         {
-            if (!hasIndexes && !new Bounds<Token>(sstable.first.token, sstable.last.token).intersects(ranges))
+            if (!hasIndexes && !new Bounds<>(sstable.first.token, sstable.last.token).intersects(ranges))
             {
-                cfs.replaceCompactedSSTables(Arrays.asList(sstable), Collections.<SSTableReader>emptyList(), OperationType.CLEANUP);
+                cfs.getDataTracker().replaceReaders(Arrays.asList(sstable), Collections.<SSTableReader>emptyList());
+                cfs.getDataTracker().markCompactedSSTablesReplaced(Arrays.asList(sstable), Collections.<SSTableReader>emptyList(), OperationType.CLEANUP);
                 continue;
             }
             if (!needsCleanup(sstable, ranges))
@@ -714,20 +705,18 @@ public class CompactionManager implements CompactionManagerMBean
             CleanupInfo ci = new CleanupInfo(sstable, scanner);
 
             metrics.beginCompaction(ci);
-            SSTableWriter writer = createWriter(cfs,
-                                                compactionFileLocation,
-                                                expectedBloomFilterSize,
-                                                sstable.getSSTableMetadata().repairedAt,
-                                                sstable);
-            SSTableReader newSstable = null;
+            SSTableRewriter writer = new SSTableRewriter(cfs, new HashSet<>(ImmutableSet.of(sstable)), sstable.maxDataAge, OperationType.CLEANUP, false);
+
             try
             {
+                writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
+
                 while (scanner.hasNext())
                 {
                     if (ci.isStopRequested())
                         throw new CompactionInterruptedException(ci.getCompactionInfo());
-                    SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
 
+                    SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
                     row = cleanupStrategy.cleanup(row);
                     if (row == null)
                         continue;
@@ -735,10 +724,11 @@ public class CompactionManager implements CompactionManagerMBean
                     if (writer.append(compactedRow) != null)
                         totalkeysWritten++;
                 }
-                if (totalkeysWritten > 0)
-                    newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
-                else
-                    writer.abort();
+
+                // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
+                cfs.indexManager.flushIndexesBlocking();
+
+                writer.finish();
             }
             catch (Throwable e)
             {
@@ -752,23 +742,18 @@ public class CompactionManager implements CompactionManagerMBean
                 metrics.finishCompaction(ci);
             }
 
-            List<SSTableReader> results = new ArrayList<SSTableReader>(1);
-            if (newSstable != null)
+            List<SSTableReader> results = writer.finished();
+            if (!results.isEmpty())
             {
-                results.add(newSstable);
-
                 String format = "Cleaned up to %s.  %,d to %,d (~%d%% of original) bytes for %,d keys.  Time: %,dms.";
                 long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
                 long startsize = sstable.onDiskLength();
-                long endsize = newSstable.onDiskLength();
+                long endsize = 0;
+                for (SSTableReader newSstable : results)
+                    endsize += newSstable.onDiskLength();
                 double ratio = (double) endsize / (double) startsize;
-                logger.info(String.format(format, writer.getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
+                logger.info(String.format(format, results.get(0).getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
             }
-
-            // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
-            cfs.indexManager.flushIndexesBlocking();
-
-            cfs.replaceCompactedSSTables(Arrays.asList(sstable), results, OperationType.CLEANUP);
         }
     }
 
@@ -989,14 +974,21 @@ public class CompactionManager implements CompactionManagerMBean
             }
 
             logger.info("Anticompacting {}", sstable);
+            Set<SSTableReader> sstableAsSet = new HashSet<>();
+            sstableAsSet.add(sstable);
+
             File destination = cfs.directories.getDirectoryForCompactedSSTables();
-            SSTableWriter repairedSSTableWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);
-            SSTableWriter unRepairedSSTableWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable);
+            SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
+            SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
+
+            AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
+            List<ICompactionScanner> scanners = strategy.getScanners(Arrays.asList(sstable));
 
             try (CompactionController controller = new CompactionController(cfs, new HashSet<>(Collections.singleton(sstable)), CFMetaData.DEFAULT_GC_GRACE_SECONDS))
             {
-                AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
-                List<ICompactionScanner> scanners = strategy.getScanners(Arrays.asList(sstable));
+                repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable));
+                unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable));
+
                 CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners, controller);
 
                 try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
@@ -1018,16 +1010,13 @@ public class CompactionManager implements CompactionManagerMBean
                         }
                     }
                 }
+                // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
+                // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
+                repairedSSTableWriter.finish(false, repairedAt);
+                unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE);
                 // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
-                if (repairedKeyCount > 0)
-                    anticompactedSSTables.add(repairedSSTableWriter.closeAndOpenReader(sstable.maxDataAge));
-                else
-                    repairedSSTableWriter.abort();
-                // supply null as we keep SSTableMetadata#repairedAt empty if the table isn't repaired
-                if (unrepairedKeyCount > 0)
-                    anticompactedSSTables.add(unRepairedSSTableWriter.closeAndOpenReader(sstable.maxDataAge));
-                else
-                    unRepairedSSTableWriter.abort();
+                anticompactedSSTables.addAll(repairedSSTableWriter.finished());
+                anticompactedSSTables.addAll(unRepairedSSTableWriter.finished());
             }
             catch (Throwable e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index f94ef93..77dc7b0 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -19,23 +19,30 @@ package org.apache.cassandra.db.compaction;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.*;
-import com.google.common.collect.*;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
-import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableRewriter;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -45,15 +52,15 @@ public class CompactionTask extends AbstractCompactionTask
 {
     protected static final Logger logger = LoggerFactory.getLogger(CompactionTask.class);
     protected final int gcBefore;
+    private final boolean offline;
     protected static long totalBytesCompacted = 0;
-    private Set<SSTableReader> toCompact;
     private CompactionExecutorStatsCollector collector;
 
-    public CompactionTask(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables, final int gcBefore)
+    public CompactionTask(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables, int gcBefore, boolean offline)
     {
-        super(cfs, sstables);
+        super(cfs, Sets.newHashSet(sstables));
         this.gcBefore = gcBefore;
-        toCompact = Sets.newHashSet(sstables);
+        this.offline = offline;
     }
 
     public static synchronized long addToTotalBytesCompacted(long bytesCompacted)
@@ -65,23 +72,23 @@ public class CompactionTask extends AbstractCompactionTask
     {
         this.collector = collector;
         run();
-        return toCompact.size();
+        return sstables.size();
     }
 
     public long getExpectedWriteSize()
     {
-        return cfs.getExpectedCompactedFileSize(toCompact, compactionType);
+        return cfs.getExpectedCompactedFileSize(sstables, compactionType);
     }
 
     public boolean reduceScopeForLimitedSpace()
     {
-        if (partialCompactionsAcceptable() && toCompact.size() > 1)
+        if (partialCompactionsAcceptable() && sstables.size() > 1)
         {
             // Try again w/o the largest one.
-            logger.warn("insufficient space to compact all requested files {}", StringUtils.join(toCompact, ", "));
+            logger.warn("insufficient space to compact all requested files {}", StringUtils.join(sstables, ", "));
             // Note that we have removed files that are still marked as compacting.
             // This suboptimal but ok since the caller will unmark all the sstables at the end.
-            return toCompact.remove(cfs.getMaxSizeFile(toCompact));
+            return sstables.remove(cfs.getMaxSizeFile(sstables));
         }
         else
         {
@@ -100,7 +107,7 @@ public class CompactionTask extends AbstractCompactionTask
         // it is not empty, it may compact down to nothing if all rows are deleted.
         assert sstables != null && sstableDirectory != null;
 
-        if (toCompact.size() == 0)
+        if (sstables.size() == 0)
             return;
 
         // Note that the current compaction strategy, is not necessarily the one this task was created under.
@@ -111,7 +118,7 @@ public class CompactionTask extends AbstractCompactionTask
             cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-compact-" + cfs.name);
 
         // sanity check: all sstables must belong to the same cfs
-        assert !Iterables.any(toCompact, new Predicate<SSTableReader>()
+        assert !Iterables.any(sstables, new Predicate<SSTableReader>()
         {
             @Override
             public boolean apply(SSTableReader sstable)
@@ -120,15 +127,15 @@ public class CompactionTask extends AbstractCompactionTask
             }
         });
 
-        UUID taskId = SystemKeyspace.startCompaction(cfs, toCompact);
+        UUID taskId = SystemKeyspace.startCompaction(cfs, sstables);
 
-        CompactionController controller = getCompactionController(toCompact);
-        Set<SSTableReader> actuallyCompact = Sets.difference(toCompact, controller.getFullyExpiredSSTables());
+        CompactionController controller = getCompactionController(sstables);
+        Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
 
         // new sstables from flush can be added during a compaction, but only the compaction can remove them,
         // so in our single-threaded compaction world this is a valid way of determining if we're compacting
         // all the sstables (that existed when we started)
-        logger.info("Compacting {}", toCompact);
+        logger.info("Compacting {}", sstables);
 
         long start = System.nanoTime();
         long totalKeysWritten = 0;
@@ -137,19 +144,18 @@ public class CompactionTask extends AbstractCompactionTask
         long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
         logger.debug("Expected bloom filter size : {}", keysPerSSTable);
 
+        // TODO: errors when creating the scanners can result in untidied resources
         AbstractCompactionIterable ci = new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller);
         CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
-        Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
 
         // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
         // replace the old entries.  Track entries to preheat here until then.
-        Map<Descriptor, Map<DecoratedKey, RowIndexEntry>> cachedKeyMap =  new HashMap<>();
-
-        Collection<SSTableReader> sstables = new ArrayList<>();
-        Collection<SSTableWriter> writers = new ArrayList<>();
         long minRepairedAt = getMinRepairedAt(actuallyCompact);
+        // we only need the age of the data that we're actually retaining
+        long maxAge = getMaxDataAge(actuallyCompact);
         if (collector != null)
             collector.beginCompaction(ci);
+        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline);
         try
         {
             if (!iter.hasNext())
@@ -157,75 +163,34 @@ public class CompactionTask extends AbstractCompactionTask
                 // don't mark compacted in the finally block, since if there _is_ nondeleted data,
                 // we need to sync it (via closeAndOpen) first, so there is no period during which
                 // a crash could cause data loss.
-                cfs.markObsolete(toCompact, compactionType);
+                cfs.markObsolete(sstables, compactionType);
                 return;
             }
 
-            SSTableWriter writer = createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt);
-            writers.add(writer);
+            writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
             while (iter.hasNext())
             {
                 if (ci.isStopRequested())
                     throw new CompactionInterruptedException(ci.getCompactionInfo());
 
                 AbstractCompactedRow row = iter.next();
-                RowIndexEntry indexEntry = writer.append(row);
-                if (indexEntry == null)
-                {
-                    controller.invalidateCachedRow(row.key);
-                    row.close();
-                    continue;
-                }
-
-                totalKeysWritten++;
-
-                if (DatabaseDescriptor.getPreheatKeyCache())
+                if (writer.append(row) != null)
                 {
-                    for (SSTableReader sstable : actuallyCompact)
+                    totalKeysWritten++;
+                    if (newSSTableSegmentThresholdReached(writer.currentWriter()))
                     {
-                        if (sstable.getCachedPosition(row.key, false) != null)
-                        {
-                            cachedKeys.put(row.key, indexEntry);
-                            break;
-                        }
+                        writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
                     }
                 }
-
-                if (newSSTableSegmentThresholdReached(writer))
-                {
-                    // tmp = false because later we want to query it with descriptor from SSTableReader
-                    cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
-                    writer = createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt);
-                    writers.add(writer);
-                    cachedKeys = new HashMap<>();
-                }
             }
 
-            if (writer.getFilePointer() > 0)
-            {
-                cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
-            }
-            else
-            {
-                writer.abort();
-                writers.remove(writer);
-            }
-
-            long maxAge = getMaxDataAge(toCompact);
-            for (SSTableWriter completedWriter : writers)
-                sstables.add(completedWriter.closeAndOpenReader(maxAge));
+            // don't replace old sstables yet, as we need to mark the compaction finished in the system table
+            writer.finish(false);
         }
         catch (Throwable t)
         {
-            for (SSTableWriter writer : writers)
-                writer.abort();
-            // also remove already completed SSTables
-            for (SSTableReader sstable : sstables)
-            {
-                sstable.markObsolete();
-                sstable.releaseReference();
-            }
-            throw Throwables.propagate(t);
+            writer.abort();
+            throw t;
         }
         finally
         {
@@ -251,20 +216,19 @@ public class CompactionTask extends AbstractCompactionTask
             }
         }
 
-        replaceCompactedSSTables(toCompact, sstables);
-        // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
-        for (SSTableReader sstable : sstables)
-            sstable.preheat(cachedKeyMap.get(sstable.descriptor));
+        Collection<SSTableReader> oldSStables = this.sstables;
+        List<SSTableReader> newSStables = writer.finished();
+        cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
 
         // log a bunch of statistics about the result and save to system table compaction_history
         long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-        long startsize = SSTableReader.getTotalBytes(toCompact);
-        long endsize = SSTableReader.getTotalBytes(sstables);
+        long startsize = SSTableReader.getTotalBytes(oldSStables);
+        long endsize = SSTableReader.getTotalBytes(newSStables);
         double ratio = (double) endsize / (double) startsize;
 
-        StringBuilder builder = new StringBuilder();
-        for (SSTableReader reader : sstables)
-            builder.append(reader.descriptor.baseFilename()).append(",");
+        StringBuilder newSSTableNames = new StringBuilder();
+        for (SSTableReader reader : newSStables)
+            newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
 
         double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
         long totalSourceRows = 0;
@@ -285,7 +249,7 @@ public class CompactionTask extends AbstractCompactionTask
 
         SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
         logger.info(String.format("Compacted %d sstables to [%s].  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
-                                         toCompact.size(), builder.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
+                                  oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
         logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
         logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
     }
@@ -307,7 +271,7 @@ public class CompactionTask extends AbstractCompactionTask
                                  repairedAt,
                                  cfs.metadata,
                                  cfs.partitioner,
-                                 new MetadataCollector(toCompact, cfs.metadata.comparator, getLevel()));
+                                 new MetadataCollector(sstables, cfs.metadata.comparator, getLevel()));
     }
 
     protected int getLevel()
@@ -315,11 +279,6 @@ public class CompactionTask extends AbstractCompactionTask
         return 0;
     }
 
-    protected void replaceCompactedSSTables(Collection<SSTableReader> compacted, Collection<SSTableReader> replacements)
-    {
-        cfs.replaceCompactedSSTables(compacted, replacements, compactionType);
-    }
-
     protected CompactionController getCompactionController(Set<SSTableReader> toCompact)
     {
         return new CompactionController(cfs, toCompact, gcBefore);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
index f64f633..2731b6d 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
@@ -30,7 +30,7 @@ public class LeveledCompactionTask extends CompactionTask
 
     public LeveledCompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int level, final int gcBefore, long maxSSTableBytes)
     {
-        super(cfs, sstables, gcBefore);
+        super(cfs, sstables, gcBefore, false);
         this.level = level;
         this.maxSSTableBytes = maxSSTableBytes;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
index a14ab43..67705e0 100644
--- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@ -57,7 +57,7 @@ public class SSTableSplitter {
 
         public SplittingCompactionTask(ColumnFamilyStore cfs, SSTableReader sstable, int sstableSizeInMB)
         {
-            super(cfs, Collections.singletonList(sstable), CompactionManager.NO_GC);
+            super(cfs, Collections.singletonList(sstable), CompactionManager.NO_GC, true);
             this.sstableSizeInMB = sstableSizeInMB;
 
             if (sstableSizeInMB <= 0)
@@ -71,11 +71,6 @@ public class SSTableSplitter {
         }
 
         @Override
-        protected void replaceCompactedSSTables(Collection<SSTableReader> compacted, Collection<SSTableReader> replacements)
-        {
-        }
-
-        @Override
         protected boolean newSSTableSegmentThresholdReached(SSTableWriter writer)
         {
             return writer.getOnDiskFilePointer() > sstableSizeInMB * 1024L * 1024L;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 01da2e1..d61f62b 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -33,10 +33,10 @@ import org.apache.cassandra.utils.OutputHandler;
 
 public class Scrubber implements Closeable
 {
-    public final ColumnFamilyStore cfs;
-    public final SSTableReader sstable;
-    public final File destination;
-    public final boolean skipCorrupted;
+    private final ColumnFamilyStore cfs;
+    private final SSTableReader sstable;
+    private final File destination;
+    private final boolean skipCorrupted;
 
     private final CompactionController controller;
     private final boolean isCommutative;
@@ -46,7 +46,8 @@ public class Scrubber implements Closeable
     private final RandomAccessReader indexFile;
     private final ScrubInfo scrubInfo;
 
-    private SSTableWriter writer;
+    private final boolean isOffline;
+
     private SSTableReader newSstable;
     private SSTableReader newInOrderSstable;
 
@@ -65,9 +66,9 @@ public class Scrubber implements Closeable
     };
     private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator);
 
-    public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted) throws IOException
+    public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, boolean isOffline) throws IOException
     {
-        this(cfs, sstable, skipCorrupted, new OutputHandler.LogOutput(), false);
+        this(cfs, sstable, skipCorrupted, new OutputHandler.LogOutput(), isOffline);
     }
 
     public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted, OutputHandler outputHandler, boolean isOffline) throws IOException
@@ -76,6 +77,7 @@ public class Scrubber implements Closeable
         this.sstable = sstable;
         this.outputHandler = outputHandler;
         this.skipCorrupted = skipCorrupted;
+        this.isOffline = isOffline;
 
         // Calculate the expected compacted filesize
         this.destination = cfs.directories.getDirectoryForCompactedSSTables();
@@ -104,6 +106,7 @@ public class Scrubber implements Closeable
     public void scrub()
     {
         outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
+        SSTableRewriter writer = new SSTableRewriter(cfs, new HashSet<>(Collections.singleton(sstable)), sstable.maxDataAge, OperationType.SCRUB, isOffline);
         try
         {
             ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
@@ -113,8 +116,7 @@ public class Scrubber implements Closeable
                 assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
             }
 
-            // TODO errors when creating the writer may leave empty temp files.
-            writer = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable);
+            writer.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
 
             DecoratedKey prevKey = null;
 
@@ -166,7 +168,6 @@ public class Scrubber implements Closeable
 
                 assert currentIndexKey != null || indexFile.isEOF();
 
-                writer.mark();
                 try
                 {
                     if (key == null)
@@ -182,7 +183,7 @@ public class Scrubber implements Closeable
                     }
 
                     AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
-                    if (writer.append(compactedRow) == null)
+                    if (writer.tryAppend(compactedRow) == null)
                         emptyRows++;
                     else
                         goodRows++;
@@ -194,7 +195,6 @@ public class Scrubber implements Closeable
                 {
                     throwIfFatal(th);
                     outputHandler.warn("Error reading row (stacktrace follows):", th);
-                    writer.resetAndTruncate();
 
                     if (currentIndexKey != null
                         && (key == null || !key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex))
@@ -212,7 +212,7 @@ public class Scrubber implements Closeable
                             }
 
                             AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
-                            if (writer.append(compactedRow) == null)
+                            if (writer.tryAppend(compactedRow) == null)
                                 emptyRows++;
                             else
                                 goodRows++;
@@ -224,7 +224,6 @@ public class Scrubber implements Closeable
                             throwIfCommutative(key, th2);
 
                             outputHandler.warn("Retry failed too. Skipping to next row (retry's stacktrace follows)", th2);
-                            writer.resetAndTruncate();
                             dataFile.seek(nextRowPositionFromIndex);
                             badRows++;
                         }
@@ -241,16 +240,27 @@ public class Scrubber implements Closeable
                 }
             }
 
-            if (writer.getFilePointer() > 0)
+            if (!outOfOrderRows.isEmpty())
             {
+                // out of order rows, but no bad rows found - we can keep our repairedAt time
                 long repairedAt = badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt;
-                newSstable = writer.closeAndOpenReader(sstable.maxDataAge, repairedAt);
+                SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);
+                for (Row row : outOfOrderRows)
+                    inOrderWriter.append(row.key, row.cf);
+                newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
+                if (!isOffline)
+                    cfs.getDataTracker().addSSTables(Collections.singleton(newInOrderSstable));
+                outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable));
             }
+
+            // finish obsoletes the old sstable
+            writer.finish(!isOffline, badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt);
+            if (!writer.finished().isEmpty())
+                newSstable = writer.finished().get(0);
         }
         catch (Throwable t)
         {
-            if (writer != null)
-                writer.abort();
+            writer.abort();
             throw Throwables.propagate(t);
         }
         finally
@@ -258,17 +268,6 @@ public class Scrubber implements Closeable
             controller.close();
         }
 
-        if (!outOfOrderRows.isEmpty())
-        {
-            // out of order rows, but no bad rows found - we can keep our repairedAt time
-            long repairedAt = badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt;
-            SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);
-            for (Row row : outOfOrderRows)
-                inOrderWriter.append(row.key, row.cf);
-            newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
-            outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable));
-        }
-
         if (newSstable == null)
         {
             if (badRows > 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 763d20b..461c5e1 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -270,7 +270,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
                 return null;
 
             if (cfs.getDataTracker().markCompacting(hottestBucket))
-                return new CompactionTask(cfs, hottestBucket, gcBefore);
+                return new CompactionTask(cfs, hottestBucket, gcBefore, false);
         }
     }
 
@@ -289,7 +289,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
             else
                 unrepaired.add(sstable);
         }
-        return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, repaired, gcBefore), new CompactionTask(cfs, unrepaired, gcBefore));
+        return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, repaired, gcBefore, false), new CompactionTask(cfs, unrepaired, gcBefore, false));
     }
 
     public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore)
@@ -302,7 +302,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
             return null;
         }
 
-        return new CompactionTask(cfs, sstables, gcBefore).setUserDefined(true);
+        return new CompactionTask(cfs, sstables, gcBefore, false).setUserDefined(true);
     }
 
     public int getEstimatedRemainingTasks()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 740a3eb..734fe23 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.OutputHandler;
 
@@ -34,7 +35,7 @@ public class Upgrader
 {
     private final ColumnFamilyStore cfs;
     private final SSTableReader sstable;
-    private final Collection<SSTableReader> toUpgrade;
+    private final Set<SSTableReader> toUpgrade;
     private final File directory;
 
     private final OperationType compactionType = OperationType.UPGRADE_SSTABLES;
@@ -48,7 +49,7 @@ public class Upgrader
     {
         this.cfs = cfs;
         this.sstable = sstable;
-        this.toUpgrade = Collections.singletonList(sstable);
+        this.toUpgrade = new HashSet<>(Collections.singleton(sstable));
         this.outputHandler = outputHandler;
 
         this.directory = new File(sstable.getFilename()).getParentFile();
@@ -86,56 +87,28 @@ public class Upgrader
     {
         outputHandler.output("Upgrading " + sstable);
 
-
-        AbstractCompactionIterable ci = new CompactionIterable(compactionType, strategy.getScanners(this.toUpgrade), controller);
-
-        CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
-
-        Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
-        Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();
-
-        try
+        SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(this.toUpgrade), OperationType.UPGRADE_SSTABLES, true);
+        try (CloseableIterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, strategy.getScanners(this.toUpgrade), controller).iterator())
         {
-            SSTableWriter writer = createCompactionWriter(sstable.getSSTableMetadata().repairedAt);
-            writers.add(writer);
+            writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
             while (iter.hasNext())
             {
                 AbstractCompactedRow row = iter.next();
-
                 writer.append(row);
             }
 
-            long maxAge = CompactionTask.getMaxDataAge(this.toUpgrade);
-            for (SSTableWriter completedWriter : writers)
-                sstables.add(completedWriter.closeAndOpenReader(maxAge));
-
+            writer.finish();
             outputHandler.output("Upgrade of " + sstable + " complete.");
 
         }
         catch (Throwable t)
         {
-            for (SSTableWriter writer : writers)
-                writer.abort();
-            // also remove already completed SSTables
-            for (SSTableReader sstable : sstables)
-            {
-                sstable.markObsolete();
-                sstable.releaseReference();
-            }
+            writer.abort();
             throw Throwables.propagate(t);
         }
         finally
         {
             controller.close();
-
-            try
-            {
-                iter.close();
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index b8a21cc..e533b1e 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -56,19 +56,17 @@ public class CompressedSequentialWriter extends SequentialWriter
 
     public CompressedSequentialWriter(File file,
                                       String offsetsPath,
-                                      boolean skipIOCache,
                                       CompressionParameters parameters,
                                       MetadataCollector sstableMetadataCollector)
     {
-        super(file, parameters.chunkLength(), skipIOCache);
+        super(file, parameters.chunkLength());
         this.compressor = parameters.sstableCompressor;
 
         // buffer for compression should be the same size as buffer itself
         compressed = new ICompressor.WrappedArray(new byte[compressor.initialCompressedBufferLength(buffer.length)]);
 
         /* Index File (-CompressionInfo.db component) and it's header */
-        metadataWriter = CompressionMetadata.Writer.open(offsetsPath);
-        metadataWriter.writeHeader(parameters);
+        metadataWriter = CompressionMetadata.Writer.open(parameters, offsetsPath);
 
         this.sstableMetadataCollector = sstableMetadataCollector;
         crcMetadata = new DataIntegrityMetadata.ChecksumWriter(out);
@@ -102,8 +100,7 @@ public class CompressedSequentialWriter extends SequentialWriter
     @Override
     protected void flushData()
     {
-        seekToChunkStart();
-
+        seekToChunkStart(); // why is this necessary? seems like it should always be at chunk start in normal operation
 
         int compressedLength;
         try
@@ -122,7 +119,7 @@ public class CompressedSequentialWriter extends SequentialWriter
         try
         {
             // write an offset of the newly written chunk to the index file
-            metadataWriter.writeLong(chunkOffset);
+            metadataWriter.addOffset(chunkOffset);
             chunkCount++;
 
             assert compressedLength <= compressed.buffer.length;
@@ -131,6 +128,7 @@ public class CompressedSequentialWriter extends SequentialWriter
             out.write(compressed.buffer, 0, compressedLength);
             // write corresponding checksum
             crcMetadata.append(compressed.buffer, 0, compressedLength);
+            lastFlushOffset += compressedLength + 4;
         }
         catch (IOException e)
         {
@@ -141,6 +139,17 @@ public class CompressedSequentialWriter extends SequentialWriter
         chunkOffset += compressedLength + 4;
     }
 
+    public CompressionMetadata openEarly()
+    {
+        return metadataWriter.openEarly(originalSize, chunkOffset);
+    }
+
+    public CompressionMetadata openAfterClose()
+    {
+        assert current == originalSize;
+        return metadataWriter.openAfterClose(current, chunkOffset);
+    }
+
     @Override
     public FileMark mark()
     {
@@ -246,10 +255,9 @@ public class CompressedSequentialWriter extends SequentialWriter
 
         super.close();
         sstableMetadataCollector.addCompressionRatio(compressedSize, originalSize);
-        metadataWriter.finalizeHeader(current, chunkCount);
         try
         {
-            metadataWriter.close();
+            metadataWriter.close(current, chunkCount);
         }
         catch (IOException e)
         {