You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/04/23 16:29:43 UTC

[2/4] Preemptive open of compaction results

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 9d7729b..8cd8c9f 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -17,14 +17,29 @@
  */
 package org.apache.cassandra.io.compress;
 
-import java.io.*;
-import java.util.*;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Longs;
 
-import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.cache.RefCountedMemory;
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -45,6 +60,7 @@ public class CompressionMetadata
     public final long compressedFileLength;
     public final boolean hasPostCompressionAdlerChecksums;
     private final Memory chunkOffsets;
+    private final long chunkOffsetsSize;
     public final String indexFilePath;
     public final CompressionParameters parameters;
 
@@ -85,7 +101,7 @@ public class CompressionMetadata
         {
             String compressorName = stream.readUTF();
             int optionCount = stream.readInt();
-            Map<String, String> options = new HashMap<String, String>();
+            Map<String, String> options = new HashMap<>();
             for (int i = 0; i < optionCount; ++i)
             {
                 String key = stream.readUTF();
@@ -114,6 +130,19 @@ public class CompressionMetadata
         {
             FileUtils.closeQuietly(stream);
         }
+        this.chunkOffsetsSize = chunkOffsets.size();
+    }
+
+    private CompressionMetadata(String filePath, CompressionParameters parameters, RefCountedMemory offsets, long offsetsSize, long dataLength, long compressedLength, boolean hasPostCompressionAdlerChecksums)
+    {
+        this.indexFilePath = filePath;
+        this.parameters = parameters;
+        this.dataLength = dataLength;
+        this.compressedFileLength = compressedLength;
+        this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
+        this.chunkOffsets = offsets;
+        offsets.reference();
+        this.chunkOffsetsSize = offsetsSize;
     }
 
     public ICompressor compressor()
@@ -173,7 +202,7 @@ public class CompressionMetadata
         // position of the chunk
         int idx = 8 * (int) (position / parameters.chunkLength());
 
-        if (idx >= chunkOffsets.size())
+        if (idx >= chunkOffsetsSize)
             throw new CorruptSSTableException(new EOFException(), indexFilePath);
 
         long chunkOffset = chunkOffsets.getLong(idx);
@@ -207,7 +236,7 @@ public class CompressionMetadata
             {
                 long offset = i * 8;
                 long chunkOffset = chunkOffsets.getLong(offset);
-                long nextChunkOffset = offset + 8 == chunkOffsets.size()
+                long nextChunkOffset = offset + 8 == chunkOffsetsSize
                                      ? compressedFileLength
                                      : chunkOffsets.getLong(offset + 8);
                 offsets.add(new Chunk(chunkOffset, (int) (nextChunkOffset - chunkOffset - 4))); // "4" bytes reserved for checksum
@@ -218,52 +247,60 @@ public class CompressionMetadata
 
     public void close()
     {
-        chunkOffsets.free();
+        if (chunkOffsets instanceof RefCountedMemory)
+            ((RefCountedMemory) chunkOffsets).unreference();
+        else
+            chunkOffsets.free();
     }
 
-    public static class Writer extends RandomAccessFile
+    public static class Writer
     {
-        // place for uncompressed data length in the index file
-        private long dataLengthOffset = -1;
         // path to the file
+        private final CompressionParameters parameters;
         private final String filePath;
+        private int maxCount = 100;
+        private RefCountedMemory offsets = new RefCountedMemory(maxCount * 8);
+        private int count = 0;
 
-        private Writer(String path) throws FileNotFoundException
+        private Writer(CompressionParameters parameters, String path)
         {
-            super(path, "rw");
+            this.parameters = parameters;
             filePath = path;
         }
 
-        public static Writer open(String path)
+        public static Writer open(CompressionParameters parameters, String path)
         {
-            try
-            {
-                return new Writer(path);
-            }
-            catch (FileNotFoundException e)
+            return new Writer(parameters, path);
+        }
+
+        public void addOffset(long offset)
+        {
+            if (count == maxCount)
             {
-                throw new RuntimeException(e);
+                RefCountedMemory newOffsets = offsets.copy((maxCount *= 2) * 8);
+                offsets.unreference();
+                offsets = newOffsets;
             }
+            offsets.setLong(8 * count++, offset);
         }
 
-        public void writeHeader(CompressionParameters parameters)
+        private void writeHeader(DataOutput out, long dataLength, int chunks)
         {
             try
             {
-                writeUTF(parameters.sstableCompressor.getClass().getSimpleName());
-                writeInt(parameters.otherOptions.size());
+                out.writeUTF(parameters.sstableCompressor.getClass().getSimpleName());
+                out.writeInt(parameters.otherOptions.size());
                 for (Map.Entry<String, String> entry : parameters.otherOptions.entrySet())
                 {
-                    writeUTF(entry.getKey());
-                    writeUTF(entry.getValue());
+                    out.writeUTF(entry.getKey());
+                    out.writeUTF(entry.getValue());
                 }
 
                 // store the length of the chunk
-                writeInt(parameters.chunkLength());
+                out.writeInt(parameters.chunkLength());
                 // store position and reserve a place for uncompressed data length and chunks count
-                dataLengthOffset = getFilePointer();
-                writeLong(-1);
-                writeInt(-1);
+                out.writeLong(dataLength);
+                out.writeInt(chunks);
             }
             catch (IOException e)
             {
@@ -271,36 +308,16 @@ public class CompressionMetadata
             }
         }
 
-        public void finalizeHeader(long dataLength, int chunks)
+        public CompressionMetadata openEarly(long dataLength, long compressedLength)
         {
-            assert dataLengthOffset != -1 : "writeHeader wasn't called";
-
-            long currentPosition;
-            try
-            {
-                currentPosition = getFilePointer();
-            }
-            catch (IOException e)
-            {
-                throw new FSReadError(e, filePath);
-            }
-
-            try
-            {
-                // seek back to the data length position
-                seek(dataLengthOffset);
-
-                // write uncompressed data length and chunks count
-                writeLong(dataLength);
-                writeInt(chunks);
+            return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums);
+        }
 
-                // seek forward to the previous position
-                seek(currentPosition);
-            }
-            catch (IOException e)
-            {
-                throw new FSWriteError(e, filePath);
-            }
+        public CompressionMetadata openAfterClose(long dataLength, long compressedLength)
+        {
+            RefCountedMemory newOffsets = offsets.copy(count * 8L);
+            offsets.unreference();
+            return new CompressionMetadata(filePath, parameters, newOffsets, count * 8L, dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums);
         }
 
         /**
@@ -312,33 +329,7 @@ public class CompressionMetadata
          */
         public long chunkOffsetBy(int chunkIndex)
         {
-            if (dataLengthOffset == -1)
-                throw new IllegalStateException("writeHeader wasn't called");
-
-            try
-            {
-                long position = getFilePointer();
-
-                // seek to the position of the given chunk
-                seek(dataLengthOffset
-                     + 8 // size reserved for uncompressed data length
-                     + 4 // size reserved for chunk count
-                     + (chunkIndex * 8L));
-
-                try
-                {
-                    return readLong();
-                }
-                finally
-                {
-                    // back to the original position
-                    seek(position);
-                }
-            }
-            catch (IOException e)
-            {
-                throw new FSReadError(e, filePath);
-            }
+            return offsets.getLong(chunkIndex * 8);
         }
 
         /**
@@ -347,25 +338,17 @@ public class CompressionMetadata
          */
         public void resetAndTruncate(int chunkIndex)
         {
-            try
-            {
-                seek(dataLengthOffset
-                     + 8 // size reserved for uncompressed data length
-                     + 4 // size reserved for chunk count
-                     + (chunkIndex * 8L));
-                getChannel().truncate(getFilePointer());
-            }
-            catch (IOException e)
-            {
-                throw new FSWriteError(e, filePath);
-            }
+            count = chunkIndex;
         }
 
-        public void close() throws IOException
+        public void close(long dataLength, int chunks) throws IOException
         {
-            if (getChannel().isOpen()) // if RAF.closed were public we could just use that, but it's not
-                getChannel().force(true);
-            super.close();
+            final DataOutputStream out = new DataOutputStream(new FileOutputStream(filePath));
+            assert chunks == count;
+            writeHeader(out, dataLength, chunks);
+            for (int i = 0 ; i < count ; i++)
+                out.writeLong(offsets.getLong(i * 8));
+            out.close();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 5d20652..7e7b364 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -83,7 +83,7 @@ public abstract class AbstractSSTableSimpleWriter
         int maxGen = 0;
         for (Descriptor desc : existing)
             maxGen = Math.max(maxGen, desc.generation);
-        return new Descriptor(directory, keyspace, columnFamily, maxGen + 1, true).filenameFor(Component.DATA);
+        return new Descriptor(directory, keyspace, columnFamily, maxGen + 1, Descriptor.Type.TEMP).filenameFor(Component.DATA);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index 18609bf..b42abf4 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -122,29 +122,41 @@ public class Descriptor
         }
     }
 
+    public static enum Type
+    {
+        TEMP("tmp", true), TEMPLINK("tmplink", true), FINAL(null, false);
+        public final boolean isTemporary;
+        public final String marker;
+        Type(String marker, boolean isTemporary)
+        {
+            this.isTemporary = isTemporary;
+            this.marker = marker;
+        }
+    }
+
     public final File directory;
     /** version has the following format: <code>[a-z]+</code> */
     public final Version version;
     public final String ksname;
     public final String cfname;
     public final int generation;
-    public final boolean temporary;
+    public final Type type;
     private final int hashCode;
 
     /**
      * A descriptor that assumes CURRENT_VERSION.
      */
-    public Descriptor(File directory, String ksname, String cfname, int generation, boolean temp)
+    public Descriptor(File directory, String ksname, String cfname, int generation, Type temp)
     {
         this(Version.CURRENT, directory, ksname, cfname, generation, temp);
     }
 
-    public Descriptor(String version, File directory, String ksname, String cfname, int generation, boolean temp)
+    public Descriptor(String version, File directory, String ksname, String cfname, int generation, Type temp)
     {
         this(new Version(version), directory, ksname, cfname, generation, temp);
     }
 
-    public Descriptor(Version version, File directory, String ksname, String cfname, int generation, boolean temp)
+    public Descriptor(Version version, File directory, String ksname, String cfname, int generation, Type temp)
     {
         assert version != null && directory != null && ksname != null && cfname != null;
         this.version = version;
@@ -152,13 +164,13 @@ public class Descriptor
         this.ksname = ksname;
         this.cfname = cfname;
         this.generation = generation;
-        temporary = temp;
+        type = temp;
         hashCode = Objects.hashCode(directory, generation, ksname, cfname, temp);
     }
 
     public Descriptor withGeneration(int newGeneration)
     {
-        return new Descriptor(version, directory, ksname, cfname, newGeneration, temporary);
+        return new Descriptor(version, directory, ksname, cfname, newGeneration, type);
     }
 
     public String filenameFor(Component component)
@@ -172,8 +184,8 @@ public class Descriptor
         buff.append(directory).append(File.separatorChar);
         buff.append(ksname).append(separator);
         buff.append(cfname).append(separator);
-        if (temporary)
-            buff.append(SSTable.TEMPFILE_MARKER).append(separator);
+        if (type.isTemporary)
+            buff.append(type.marker).append(separator);
         buff.append(version).append(separator);
         buff.append(generation);
         return buff.toString();
@@ -231,10 +243,15 @@ public class Descriptor
 
         // optional temporary marker
         nexttok = st.nextToken();
-        boolean temporary = false;
-        if (nexttok.equals(SSTable.TEMPFILE_MARKER))
+        Type type = Type.FINAL;
+        if (nexttok.equals(Type.TEMP.marker))
+        {
+            type = Type.TEMP;
+            nexttok = st.nextToken();
+        }
+        else if (nexttok.equals(Type.TEMPLINK.marker))
         {
-            temporary = true;
+            type = Type.TEMPLINK;
             nexttok = st.nextToken();
         }
 
@@ -250,16 +267,16 @@ public class Descriptor
         if (!skipComponent)
             component = st.nextToken();
         directory = directory != null ? directory : new File(".");
-        return Pair.create(new Descriptor(version, directory, ksname, cfname, generation, temporary), component);
+        return Pair.create(new Descriptor(version, directory, ksname, cfname, generation, type), component);
     }
 
     /**
-     * @param temporary temporary flag
+     * @param type temporary flag
      * @return A clone of this descriptor with the given 'temporary' status.
      */
-    public Descriptor asTemporary(boolean temporary)
+    public Descriptor asType(Type type)
     {
-        return new Descriptor(version, directory, ksname, cfname, generation, temporary);
+        return new Descriptor(version, directory, ksname, cfname, generation, type);
     }
 
     public IMetadataSerializer getMetadataSerializer()
@@ -296,7 +313,7 @@ public class Descriptor
                        && that.generation == this.generation
                        && that.ksname.equals(this.ksname)
                        && that.cfname.equals(this.cfname)
-                       && that.temporary == this.temporary;
+                       && that.type == this.type;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index 0696fb7..f53a7e4 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -25,11 +25,11 @@ import java.nio.ByteBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.cache.RefCountedMemory;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.Memory;
 import org.apache.cassandra.io.util.MemoryOutputStream;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -60,7 +60,7 @@ public class IndexSummary implements Closeable
     private final IPartitioner partitioner;
     private final int summarySize;
     private final int sizeAtFullSampling;
-    private final Memory bytes;
+    private final RefCountedMemory bytes;
 
     /**
      * A value between 1 and BASE_SAMPLING_LEVEL that represents how many of the original
@@ -70,7 +70,7 @@ public class IndexSummary implements Closeable
      */
     private final int samplingLevel;
 
-    public IndexSummary(IPartitioner partitioner, Memory memory, int summarySize, int sizeAtFullSampling,
+    public IndexSummary(IPartitioner partitioner, RefCountedMemory memory, int summarySize, int sizeAtFullSampling,
                         int minIndexInterval, int samplingLevel)
     {
         this.partitioner = partitioner;
@@ -251,7 +251,7 @@ public class IndexSummary implements Closeable
                                                     " the current max index interval (%d)", effectiveIndexInterval, maxIndexInterval));
             }
 
-            Memory memory = Memory.allocate(offheapSize);
+            RefCountedMemory memory = new RefCountedMemory(offheapSize);
             FBUtilities.copy(in, new MemoryOutputStream(memory), offheapSize);
             return new IndexSummary(partitioner, memory, summarySize, fullSamplingSummarySize, minIndexInterval, samplingLevel);
         }
@@ -260,6 +260,12 @@ public class IndexSummary implements Closeable
     @Override
     public void close()
     {
-        bytes.free();
+        bytes.unreference();
+    }
+
+    public IndexSummary readOnlyClone()
+    {
+        bytes.reference();
+        return this;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index d77e887..8580dce 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -17,15 +17,17 @@
  */
 package org.apache.cassandra.io.sstable;
 
-import java.util.*;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cache.RefCountedMemory;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.util.Memory;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
 
@@ -34,7 +36,7 @@ public class IndexSummaryBuilder
     private static final Logger logger = LoggerFactory.getLogger(IndexSummaryBuilder.class);
 
     private final ArrayList<Long> positions;
-    private final ArrayList<byte[]> keys;
+    private final ArrayList<DecoratedKey> keys;
     private final int minIndexInterval;
     private final int samplingLevel;
     private final int[] startPoints;
@@ -69,6 +71,27 @@ public class IndexSummaryBuilder
         keys = new ArrayList<>((int)maxExpectedEntries);
     }
 
+    // finds the last (-offset) decorated key that can be guaranteed to occur fully in the index file before the provided file position
+    public DecoratedKey getMaxReadableKey(long position, int offset)
+    {
+        int i = Collections.binarySearch(positions, position);
+        if (i < 0)
+        {
+            i = -1 - i;
+            if (i == positions.size())
+                i -= 2;
+            else
+                i -= 1;
+        }
+        else
+            i -= 1;
+        i -= offset;
+        // we don't want to return any key if there's only 1 item in the summary, to make sure the sstable range is non-empty
+        if (i <= 0)
+            return null;
+        return keys.get(i);
+    }
+
     public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexPosition)
     {
         if (keysWritten % minIndexInterval == 0)
@@ -86,9 +109,8 @@ public class IndexSummaryBuilder
 
             if (!shouldSkip)
             {
-                byte[] key = ByteBufferUtil.getArray(decoratedKey.key);
-                keys.add(key);
-                offheapSize += key.length;
+                keys.add(decoratedKey);
+                offheapSize += decoratedKey.key.remaining();
                 positions.add(indexPosition);
                 offheapSize += TypeSizes.NATIVE.sizeof(indexPosition);
             }
@@ -102,32 +124,51 @@ public class IndexSummaryBuilder
 
     public IndexSummary build(IPartitioner partitioner)
     {
+        return build(partitioner, null);
+    }
+
+    public IndexSummary build(IPartitioner partitioner, DecoratedKey exclusiveUpperBound)
+    {
         assert keys.size() > 0;
         assert keys.size() == positions.size();
 
+        int length;
+        if (exclusiveUpperBound == null)
+            length = keys.size();
+        else
+            length = Collections.binarySearch(keys, exclusiveUpperBound);
+
+        assert length > 0;
+
+        long offheapSize = this.offheapSize;
+        if (length < keys.size())
+            for (int i = length ; i < keys.size() ; i++)
+                offheapSize -= keys.get(i).key.remaining() + TypeSizes.NATIVE.sizeof(positions.get(i));
+
         // first we write out the position in the *summary* for each key in the summary,
         // then we write out (key, actual index position) pairs
-        Memory memory = Memory.allocate(offheapSize + (keys.size() * 4));
+        RefCountedMemory memory = new RefCountedMemory(offheapSize + (length * 4));
         int idxPosition = 0;
-        int keyPosition = keys.size() * 4;
-        for (int i = 0; i < keys.size(); i++)
+        int keyPosition = length * 4;
+        for (int i = 0; i < length; i++)
         {
             // write the position of the actual entry in the index summary (4 bytes)
             memory.setInt(idxPosition, keyPosition);
             idxPosition += TypeSizes.NATIVE.sizeof(keyPosition);
 
             // write the key
-            byte[] keyBytes = keys.get(i);
-            memory.setBytes(keyPosition, keyBytes, 0, keyBytes.length);
-            keyPosition += keyBytes.length;
+            ByteBuffer keyBytes = keys.get(i).key;
+            memory.setBytes(keyPosition, keyBytes);
+            keyPosition += keyBytes.remaining();
 
             // write the position in the actual index file
             long actualIndexPosition = positions.get(i);
             memory.setLong(keyPosition, actualIndexPosition);
             keyPosition += TypeSizes.NATIVE.sizeof(actualIndexPosition);
         }
+        assert keyPosition == offheapSize + (length * 4);
         int sizeAtFullSampling = (int) Math.ceil(keysWritten / (double) minIndexInterval);
-        return new IndexSummary(partitioner, memory, keys.size(), sizeAtFullSampling, minIndexInterval, samplingLevel);
+        return new IndexSummary(partitioner, memory, length, sizeAtFullSampling, minIndexInterval, samplingLevel);
     }
 
     public static int entriesAtSamplingLevel(int samplingLevel, int maxSummarySize)
@@ -190,7 +231,7 @@ public class IndexSummaryBuilder
 
         // Subtract (removedKeyCount * 4) from the new size to account for fewer entries in the first section, which
         // stores the position of the actual entries in the summary.
-        Memory memory = Memory.allocate(newOffHeapSize - (removedKeyCount * 4));
+        RefCountedMemory memory = new RefCountedMemory(newOffHeapSize - (removedKeyCount * 4));
 
         // Copy old entries to our new Memory.
         int idxPosition = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 5ee9bdb..247343e 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -56,8 +56,6 @@ public abstract class SSTable
 {
     static final Logger logger = LoggerFactory.getLogger(SSTable.class);
 
-    public static final String TEMPFILE_MARKER = "tmp";
-
     public static final int TOMBSTONE_HISTOGRAM_BIN_SIZE = 100;
 
     public final Descriptor descriptor;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index c330c88..7b9d135 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -79,7 +79,7 @@ public class SSTableLoader implements StreamEventHandler
                     return false;
                 Pair<Descriptor, Component> p = SSTable.tryComponentFromFilename(dir, name);
                 Descriptor desc = p == null ? null : p.left;
-                if (p == null || !p.right.equals(Component.DATA) || desc.temporary)
+                if (p == null || !p.right.equals(Component.DATA) || desc.type.isTemporary)
                     return false;
 
                 if (!new File(desc.filenameFor(Component.PRIMARY_INDEX)).exists())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 8e359bd..c84eec2 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -193,6 +193,7 @@ public class SSTableReader extends SSTable
     private SSTableReader replacedBy;
     private SSTableReader replaces;
     private SSTableDeletingTask deletingTask;
+    private Runnable runOnClose;
 
     @VisibleForTesting
     public RestorableMeter readMeter;
@@ -340,7 +341,7 @@ public class SSTableReader extends SSTable
         // special implementation of load to use non-pooled SegmentedFile builders
         SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
         SegmentedFile.Builder dbuilder = sstable.compression
-                                       ? new CompressedSegmentedFile.Builder()
+                                       ? new CompressedSegmentedFile.Builder(null)
                                        : new BufferedSegmentedFile.Builder();
         if (!sstable.loadSummary(ibuilder, dbuilder))
             sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL);
@@ -555,7 +556,7 @@ public class SSTableReader extends SSTable
 
         synchronized (replaceLock)
         {
-            boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFile = false;
+            boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = false;
 
             if (replacedBy != null)
             {
@@ -563,7 +564,7 @@ public class SSTableReader extends SSTable
                 closeSummary = replacedBy.indexSummary != indexSummary;
                 closeFiles = replacedBy.dfile != dfile;
                 // if the replacement sstablereader uses a different path, clean up our paths
-                deleteFile = !dfile.path.equals(replacedBy.dfile.path);
+                deleteFiles = !dfile.path.equals(replacedBy.dfile.path);
             }
 
             if (replaces != null)
@@ -571,7 +572,7 @@ public class SSTableReader extends SSTable
                 closeBf &= replaces.bf != bf;
                 closeSummary &= replaces.indexSummary != indexSummary;
                 closeFiles &= replaces.dfile != dfile;
-                deleteFile &= !dfile.path.equals(replaces.dfile.path);
+                deleteFiles &= !dfile.path.equals(replaces.dfile.path);
             }
 
             boolean deleteAll = false;
@@ -597,12 +598,15 @@ public class SSTableReader extends SSTable
                     replacedBy.replaces = replaces;
             }
 
-            scheduleTidy(closeBf, closeSummary, closeFiles, deleteFile, deleteAll);
+            scheduleTidy(closeBf, closeSummary, closeFiles, deleteFiles, deleteAll);
         }
     }
 
     private void scheduleTidy(final boolean closeBf, final boolean closeSummary, final boolean closeFiles, final boolean deleteFiles, final boolean deleteAll)
     {
+        if (references.get() != 0)
+            throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)");
+
         final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
         final OpOrder.Barrier barrier;
         if (cfs != null)
@@ -619,7 +623,6 @@ public class SSTableReader extends SSTable
             {
                 if (barrier != null)
                     barrier.await();
-                assert references.get() == 0;
                 if (closeBf)
                     bf.close();
                 if (closeSummary)
@@ -629,6 +632,8 @@ public class SSTableReader extends SSTable
                     ifile.cleanup();
                     dfile.cleanup();
                 }
+                if (runOnClose != null)
+                    runOnClose.run();
                 if (deleteAll)
                 {
                     /**
@@ -650,6 +655,16 @@ public class SSTableReader extends SSTable
         });
     }
 
+    public boolean equals(Object that)
+    {
+        return that instanceof SSTableReader && ((SSTableReader) that).descriptor.equals(this.descriptor);
+    }
+
+    public int hashCode()
+    {
+        return this.descriptor.hashCode();
+    }
+
     public String getFilename()
     {
         return dfile.path;
@@ -894,6 +909,53 @@ public class SSTableReader extends SSTable
         }
     }
 
+    public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
+    {
+        synchronized (replaceLock)
+        {
+            assert replacedBy == null;
+
+            if (newStart.compareTo(this.first) > 0)
+            {
+                if (newStart.compareTo(this.last) > 0)
+                {
+                    this.runOnClose = new Runnable()
+                    {
+                        public void run()
+                        {
+                            CLibrary.trySkipCache(dfile.path, 0, 0);
+                            CLibrary.trySkipCache(ifile.path, 0, 0);
+                            runOnClose.run();
+                        }
+                    };
+                }
+                else
+                {
+                    final long dataStart = getPosition(newStart, Operator.GE).position;
+                    final long indexStart = getIndexScanPosition(newStart);
+                    this.runOnClose = new Runnable()
+                    {
+                        public void run()
+                        {
+                            CLibrary.trySkipCache(dfile.path, 0, dataStart);
+                            CLibrary.trySkipCache(ifile.path, 0, indexStart);
+                            runOnClose.run();
+                        }
+                    };
+                }
+            }
+
+            if (readMeterSyncFuture != null)
+                readMeterSyncFuture.cancel(false);
+            SSTableReader replacement = new SSTableReader(descriptor, components, metadata, partitioner, ifile, dfile, indexSummary.readOnlyClone(), bf, maxDataAge, sstableMetadata);
+            replacement.readMeter = this.readMeter;
+            replacement.first = this.last.compareTo(newStart) > 0 ? newStart : this.last;
+            replacement.last = this.last;
+            setReplacedBy(replacement);
+            return replacement;
+        }
+    }
+
     /**
      * Returns a new SSTableReader with the same properties as this SSTableReader except that a new IndexSummary will
      * be built at the target samplingLevel.  This (original) SSTableReader instance will be marked as replaced, have
@@ -1022,7 +1084,7 @@ public class SSTableReader extends SSTable
         return getIndexScanPositionFromBinarySearchResult(indexSummary.binarySearch(key), indexSummary);
     }
 
-    public static long getIndexScanPositionFromBinarySearchResult(int binarySearchResult, IndexSummary referencedIndexSummary)
+    private static long getIndexScanPositionFromBinarySearchResult(int binarySearchResult, IndexSummary referencedIndexSummary)
     {
         if (binarySearchResult == -1)
             return -1;
@@ -1245,6 +1307,12 @@ public class SSTableReader extends SSTable
         return positions;
     }
 
+    public void invalidateCacheKey(DecoratedKey key)
+    {
+        KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.key);
+        keyCache.remove(cacheKey);
+    }
+
     public void cacheKey(DecoratedKey key, RowIndexEntry info)
     {
         CachingOptions caching = metadata.getCaching();
@@ -1261,29 +1329,6 @@ public class SSTableReader extends SSTable
         keyCache.put(cacheKey, info);
     }
 
-    public void preheat(Map<DecoratedKey, RowIndexEntry> cachedKeys) throws IOException
-    {
-        RandomAccessFile f = new RandomAccessFile(getFilename(), "r");
-
-        try
-        {
-            int fd = CLibrary.getfd(f.getFD());
-
-            for (Map.Entry<DecoratedKey, RowIndexEntry> entry : cachedKeys.entrySet())
-            {
-                cacheKey(entry.getKey(), entry.getValue());
-
-                // add to the cache but don't do actual preheating if we have it disabled in the config
-                if (DatabaseDescriptor.shouldPreheatPageCache() && fd > 0)
-                    CLibrary.preheatPage(fd, entry.getValue().position);
-            }
-        }
-        finally
-        {
-            FileUtils.closeQuietly(f);
-        }
-    }
-
     public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats)
     {
         return getCachedPosition(new KeyCacheKey(metadata.cfId, descriptor, key.key), updateStats);
@@ -1662,6 +1707,20 @@ public class SSTableReader extends SSTable
         return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
     }
 
+    public SSTableReader getCurrentReplacement()
+    {
+        synchronized (replaceLock)
+        {
+            SSTableReader cur = this, next = replacedBy;
+            while (next != null)
+            {
+                cur = next;
+                next = next.replacedBy;
+            }
+            return cur;
+        }
+    }
+
     /**
      * TODO: Move someplace reusable
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
new file mode 100644
index 0000000..2dfefc4
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataTracker;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.utils.CLibrary;
+
+public class SSTableRewriter
+{
+
+    private static final long preemptiveOpenInterval;
+    static
+    {
+        long interval = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20);
+        if (interval < 0)
+            interval = Long.MAX_VALUE;
+        preemptiveOpenInterval = interval;
+    }
+
+    private final DataTracker dataTracker;
+    private final ColumnFamilyStore cfs;
+
+    private final long maxAge;
+    private final Set<SSTableReader> rewriting; // the readers we are rewriting (updated as they are replaced)
+    private final Map<Descriptor, DecoratedKey> originalStarts = new HashMap<>(); // the start key for each reader we are rewriting
+    private final Map<Descriptor, Integer> fileDescriptors = new HashMap<>(); // the file descriptors for each reader descriptor we are rewriting
+
+    private SSTableReader currentlyOpenedEarly; // the reader for the most recent (re)opening of the target file
+    private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at
+
+    private final List<SSTableReader> finished = new ArrayList<>(); // the resultant sstables
+    private final OperationType rewriteType; // the type of rewrite/compaction being performed
+    private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of DataTracker)
+
+    private SSTableWriter writer;
+    private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
+
+    public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, OperationType rewriteType, boolean isOffline)
+    {
+        this.rewriting = rewriting;
+        for (SSTableReader sstable : rewriting)
+        {
+            originalStarts.put(sstable.descriptor, sstable.first);
+            fileDescriptors.put(sstable.descriptor, CLibrary.getfd(sstable.getFilename()));
+        }
+        this.dataTracker = cfs.getDataTracker();
+        this.cfs = cfs;
+        this.maxAge = maxAge;
+        this.rewriteType = rewriteType;
+        this.isOffline = isOffline;
+    }
+
+    public SSTableWriter currentWriter()
+    {
+        return writer;
+    }
+
+    public RowIndexEntry append(AbstractCompactedRow row)
+    {
+        // we do this before appending to ensure we can resetAndTruncate() safely if the append fails
+        maybeReopenEarly(row.key);
+        RowIndexEntry index = writer.append(row);
+        if (!isOffline)
+        {
+            if (index == null)
+            {
+                cfs.invalidateCachedRow(row.key);
+            }
+            else
+            {
+                boolean save = false;
+                for (SSTableReader reader : rewriting)
+                {
+                    if (reader.getCachedPosition(row.key, false) != null)
+                    {
+                        save = true;
+                        break;
+                    }
+                }
+                if (save)
+                    cachedKeys.put(row.key, index);
+            }
+        }
+        return index;
+    }
+
+    // attempts to append the row, if fails resets the writer position
+    public RowIndexEntry tryAppend(AbstractCompactedRow row)
+    {
+        mark();
+        try
+        {
+            return append(row);
+        }
+        catch (Throwable t)
+        {
+            resetAndTruncate();
+            throw t;
+        }
+    }
+
+    private void mark()
+    {
+        writer.mark();
+    }
+
+    private void resetAndTruncate()
+    {
+        writer.resetAndTruncate();
+    }
+
+    private void maybeReopenEarly(DecoratedKey key)
+    {
+        if (writer.getFilePointer() - currentlyOpenedEarlyAt > preemptiveOpenInterval)
+        {
+            if (isOffline)
+            {
+                for (SSTableReader reader : rewriting)
+                {
+                    RowIndexEntry index = reader.getPosition(key, SSTableReader.Operator.GE);
+                    CLibrary.trySkipCache(fileDescriptors.get(reader.descriptor), 0, index == null ? 0 : index.position);
+                }
+            }
+            else
+            {
+                SSTableReader reader = writer.openEarly(maxAge);
+                if (reader != null)
+                {
+                    replaceReader(currentlyOpenedEarly, reader);
+                    currentlyOpenedEarly = reader;
+                    currentlyOpenedEarlyAt = writer.getFilePointer();
+                    moveStarts(reader, Functions.constant(reader.last), false);
+                }
+            }
+        }
+    }
+
+    public void abort()
+    {
+        if (writer == null)
+            return;
+        moveStarts(null, Functions.forMap(originalStarts), true);
+        List<SSTableReader> close = new ArrayList<>(finished);
+        if (currentlyOpenedEarly != null)
+            close.add(currentlyOpenedEarly);
+        // also remove already completed SSTables
+        for (SSTableReader sstable : close)
+            sstable.markObsolete();
+        // releases reference in replaceReaders
+        if (!isOffline)
+        {
+            dataTracker.replaceReaders(close, Collections.<SSTableReader>emptyList());
+            dataTracker.unmarkCompacting(close);
+        }
+        writer.abort();
+    }
+
+    /**
+     * Replace the readers we are rewriting with cloneWithNewStart, reclaiming any page cache that is no longer
+     * needed, and transferring any key cache entries over to the new reader, expiring them from the old. if reset
+     * is true, we are instead restoring the starts of the readers from before the rewriting began
+     *
+     * @param newReader the rewritten reader that replaces them for this region
+     * @param newStarts a function mapping a reader's descriptor to their new start value
+     * @param reset true iff we are restoring earlier starts (increasing the range over which they are valid)
+     */
+    private void moveStarts(SSTableReader newReader, Function<? super Descriptor, DecoratedKey> newStarts, boolean reset)
+    {
+        if (isOffline)
+            return;
+        List<SSTableReader> toReplace = new ArrayList<>();
+        List<SSTableReader> replaceWith = new ArrayList<>();
+        final List<DecoratedKey> invalidateKeys = new ArrayList<>();
+        if (!reset)
+        {
+            invalidateKeys.addAll(cachedKeys.keySet());
+            for (Map.Entry<DecoratedKey, RowIndexEntry> cacheKey : cachedKeys.entrySet())
+                newReader.cacheKey(cacheKey.getKey(), cacheKey.getValue());
+        }
+        cachedKeys = new HashMap<>();
+        for (final SSTableReader sstable : rewriting)
+        {
+            DecoratedKey newStart = newStarts.apply(sstable.descriptor);
+            assert newStart != null;
+            if (sstable.first.compareTo(newStart) < 0 || (reset && newStart != sstable.first))
+            {
+                toReplace.add(sstable);
+                // we call getCurrentReplacement() to support multiple rewriters operating over the same source readers at once.
+                // note: only one such writer should be written to at any moment
+                replaceWith.add(sstable.getCurrentReplacement().cloneWithNewStart(newStart, new Runnable()
+                {
+                    public void run()
+                    {
+                        // this is somewhat racey, in that we could theoretically be closing this old reader
+                        // when an even older reader is still in use, but it's not likely to have any major impact
+                        for (DecoratedKey key : invalidateKeys)
+                            sstable.invalidateCacheKey(key);
+                    }
+                }));
+            }
+        }
+        replaceReaders(toReplace, replaceWith);
+        rewriting.removeAll(toReplace);
+        rewriting.addAll(replaceWith);
+    }
+
+    private void replaceReader(SSTableReader toReplace, SSTableReader replaceWith)
+    {
+        if (isOffline)
+            return;
+        Set<SSTableReader> toReplaceSet;
+        if (toReplace != null)
+        {
+            toReplace.setReplacedBy(replaceWith);
+            toReplaceSet = Collections.singleton(toReplace);
+        }
+        else
+        {
+            dataTracker.markCompacting(Collections.singleton(replaceWith));
+            toReplaceSet = Collections.emptySet();
+        }
+        replaceReaders(toReplaceSet, Collections.singleton(replaceWith));
+    }
+
+    private void replaceReaders(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith)
+    {
+        if (isOffline)
+            return;
+        dataTracker.replaceReaders(toReplace, replaceWith);
+    }
+
+    public void switchWriter(SSTableWriter newWriter)
+    {
+        if (writer == null)
+        {
+            writer = newWriter;
+            return;
+        }
+        // tmp = false because later we want to query it with descriptor from SSTableReader
+        SSTableReader reader = writer.closeAndOpenReader(maxAge);
+        finished.add(reader);
+        replaceReader(currentlyOpenedEarly, reader);
+        moveStarts(reader, Functions.constant(reader.last), false);
+        currentlyOpenedEarly = null;
+        currentlyOpenedEarlyAt = 0;
+        writer = newWriter;
+    }
+
+    public void finish()
+    {
+        finish(-1);
+    }
+    public void finish(long repairedAt)
+    {
+        finish(true, repairedAt);
+    }
+    public void finish(boolean cleanupOldReaders)
+    {
+        finish(cleanupOldReaders, -1);
+    }
+    public void finish(boolean cleanupOldReaders, long repairedAt)
+    {
+        if (writer.getFilePointer() > 0)
+        {
+            SSTableReader reader = repairedAt < 0 ?
+                                    writer.closeAndOpenReader(maxAge) :
+                                    writer.closeAndOpenReader(maxAge, repairedAt);
+            finished.add(reader);
+            replaceReader(currentlyOpenedEarly, reader);
+            moveStarts(reader, Functions.constant(reader.last), false);
+        }
+        else
+        {
+            writer.abort();
+            writer = null;
+        }
+
+        if (!isOffline)
+        {
+            dataTracker.unmarkCompacting(finished);
+            if (cleanupOldReaders)
+                dataTracker.markCompactedSSTablesReplaced(rewriting, finished, rewriteType);
+        }
+        else if (cleanupOldReaders)
+        {
+            for (SSTableReader reader : rewriting)
+            {
+                reader.markObsolete();
+                reader.releaseReference();
+            }
+        }
+    }
+
+    public List<SSTableReader> finished()
+    {
+        return finished;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 4a7729e..1c9c5fd 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -17,26 +17,51 @@
  */
 package org.apache.cassandra.io.sstable;
 
-import java.io.*;
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import com.google.common.collect.Sets;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ArrayBackedSortedColumns;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnIndex;
+import org.apache.cassandra.db.ColumnSerializer;
+import org.apache.cassandra.db.CounterCell;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.OnDiskAtom;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
-import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.SequentialWriter;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FilterFactory;
@@ -110,17 +135,16 @@ public class SSTableWriter extends SSTable
 
         if (compression)
         {
-            dbuilder = SegmentedFile.getCompressedBuilder();
             dataFile = SequentialWriter.open(getFilename(),
                                              descriptor.filenameFor(Component.COMPRESSION_INFO),
-                                             !metadata.populateIoCacheOnFlush(),
                                              metadata.compressionParameters(),
                                              sstableMetadataCollector);
+            dbuilder = SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile);
         }
         else
         {
+            dataFile = SequentialWriter.open(new File(getFilename()), new File(descriptor.filenameFor(Component.CRC)));
             dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
-            dataFile = SequentialWriter.open(new File(getFilename()), !metadata.populateIoCacheOnFlush(), new File(descriptor.filenameFor(Component.CRC)));
         }
 
         this.sstableMetadataCollector = sstableMetadataCollector;
@@ -299,9 +323,16 @@ public class SSTableWriter extends SSTable
      */
     public void abort()
     {
-        assert descriptor.temporary;
-        FileUtils.closeQuietly(iwriter);
-        FileUtils.closeQuietly(dataFile);
+        assert descriptor.type.isTemporary;
+        if (iwriter == null && dataFile == null)
+            return;
+        if (iwriter != null)
+        {
+            FileUtils.closeQuietly(iwriter.indexFile);
+            iwriter.bf.close();
+        }
+        if (dataFile!= null)
+            FileUtils.closeQuietly(dataFile);
 
         Set<Component> components = SSTable.componentsFor(descriptor);
         try
@@ -326,6 +357,54 @@ public class SSTableWriter extends SSTable
         last = lastWrittenKey = getMinimalKey(last);
     }
 
+    public SSTableReader openEarly(long maxDataAge)
+    {
+        StatsMetadata sstableMetadata = (StatsMetadata) sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
+                                                  metadata.getBloomFilterFpChance(),
+                                                  repairedAt).get(MetadataType.STATS);
+
+        // find the max (exclusive) readable key
+        DecoratedKey exclusiveUpperBoundOfReadableIndex = iwriter.getMaxReadableKey(0);
+        if (exclusiveUpperBoundOfReadableIndex == null)
+            return null;
+
+        // create temp links if they don't already exist
+        Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
+        if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
+        {
+            FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX)));
+            FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA)));
+        }
+
+        // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers
+        SegmentedFile ifile = iwriter.builder.openEarly(link.filenameFor(Component.PRIMARY_INDEX));
+        SegmentedFile dfile = dbuilder.openEarly(link.filenameFor(Component.DATA));
+        SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
+                                                           components, metadata,
+                                                           partitioner, ifile,
+                                                           dfile, iwriter.summary.build(partitioner, exclusiveUpperBoundOfReadableIndex),
+                                                           iwriter.bf, maxDataAge, sstableMetadata);
+
+        // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
+        sstable.first = getMinimalKey(first);
+        sstable.last = getMinimalKey(exclusiveUpperBoundOfReadableIndex);
+        DecoratedKey inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(1);
+        if (inclusiveUpperBoundOfReadableData == null)
+            return null;
+        int offset = 2;
+        while (true)
+        {
+            RowIndexEntry indexEntry = sstable.getPosition(inclusiveUpperBoundOfReadableData, SSTableReader.Operator.GT);
+            if (indexEntry != null && indexEntry.position <= dataFile.getLastFlushOffset())
+                break;
+            inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(offset++);
+            if (inclusiveUpperBoundOfReadableData == null)
+                return null;
+        }
+        sstable.last = getMinimalKey(inclusiveUpperBoundOfReadableData);
+        return sstable;
+    }
+
     public SSTableReader closeAndOpenReader()
     {
         return closeAndOpenReader(System.currentTimeMillis());
@@ -395,7 +474,7 @@ public class SSTableWriter extends SSTable
 
     private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components)
     {
-        SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)), true);
+        SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)));
         try
         {
             desc.getMetadataSerializer().serialize(components, out.stream);
@@ -412,7 +491,7 @@ public class SSTableWriter extends SSTable
 
     static Descriptor rename(Descriptor tmpdesc, Set<Component> components)
     {
-        Descriptor newdesc = tmpdesc.asTemporary(false);
+        Descriptor newdesc = tmpdesc.asType(Descriptor.Type.FINAL);
         rename(tmpdesc, newdesc, components);
         return newdesc;
     }
@@ -454,13 +533,19 @@ public class SSTableWriter extends SSTable
 
         IndexWriter(long keyCount)
         {
-            indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)),
-                                              !metadata.populateIoCacheOnFlush());
+            indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
             builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
             summary = new IndexSummaryBuilder(keyCount, metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL);
             bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true);
         }
 
+        // finds the last (-offset) decorated key that can be guaranteed to occur fully in the flushed portion of the index file
+        DecoratedKey getMaxReadableKey(int offset)
+        {
+            long maxIndexLength = indexFile.getLastFlushOffset();
+            return summary.getMaxReadableKey(maxIndexLength, offset);
+        }
+
         public void append(DecoratedKey key, RowIndexEntry indexEntry)
         {
             bf.add(key.key);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 84789a6..7ba2895 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -19,15 +19,25 @@ package org.apache.cassandra.io.sstable.metadata;
 
 import java.io.File;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
 import com.clearspring.analytics.stream.cardinality.ICardinality;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.ColumnNameHelper;
+import org.apache.cassandra.io.sstable.ColumnStats;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.EstimatedHistogram;
 import org.apache.cassandra.utils.MurmurHash;
@@ -246,8 +256,8 @@ public class MetadataCollector
                                                              compressionRatio,
                                                              estimatedTombstoneDropTime,
                                                              sstableLevel,
-                                                             minColumnNames,
-                                                             maxColumnNames,
+                                                             ImmutableList.copyOf(minColumnNames),
+                                                             ImmutableList.copyOf(maxColumnNames),
                                                              hasLegacyCounterShards,
                                                              repairedAt));
         components.put(MetadataType.COMPACTION, new CompactionMetadata(ancestors, cardinality));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
index c0cd96e..7414208 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -146,7 +146,7 @@ public class MetadataSerializer implements IMetadataSerializer
 
     private void rewriteSSTableMetadata(Descriptor descriptor, Map<MetadataType, MetadataComponent> currentComponents) throws IOException
     {
-        Descriptor tmpDescriptor = descriptor.asTemporary(true);
+        Descriptor tmpDescriptor = descriptor.asType(Descriptor.Type.TEMP);
 
         try (DataOutputStreamAndChannel out = new DataOutputStreamAndChannel(new FileOutputStream(tmpDescriptor.filenameFor(Component.STATS))))
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
index 6a23fde..b284f61 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
@@ -38,6 +38,11 @@ public class BufferedPoolingSegmentedFile extends PoolingSegmentedFile
             long length = new File(path).length();
             return new BufferedPoolingSegmentedFile(path, length);
         }
+
+        public SegmentedFile openEarly(String path)
+        {
+            return complete(path);
+        }
     }
 
     protected RandomAccessReader createReader(String path)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
index 790b42b..aa031e3 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
@@ -38,6 +38,11 @@ public class BufferedSegmentedFile extends SegmentedFile
             long length = new File(path).length();
             return new BufferedSegmentedFile(path, length);
         }
+
+        public SegmentedFile openEarly(String path)
+        {
+            return complete(path);
+        }
     }
 
     public FileDataInput getSegment(long position)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
index 3c4c257..98492da 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
@@ -9,10 +9,10 @@ public class ChecksummedSequentialWriter extends SequentialWriter
     private final SequentialWriter crcWriter;
     private final DataIntegrityMetadata.ChecksumWriter crcMetadata;
 
-    public ChecksummedSequentialWriter(File file, int bufferSize, boolean skipIOCache, File crcPath)
+    public ChecksummedSequentialWriter(File file, int bufferSize, File crcPath)
     {
-        super(file, bufferSize, skipIOCache);
-        crcWriter = new SequentialWriter(crcPath, 8 * 1024, true);
+        super(file, bufferSize);
+        crcWriter = new SequentialWriter(crcPath, 8 * 1024);
         crcMetadata = new DataIntegrityMetadata.ChecksumWriter(crcWriter.stream);
         crcMetadata.writeChunkSize(buffer.length);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
index 121bdb2..1803e69 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.io.util;
 
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 
 public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile implements ICompressedFile
@@ -30,8 +31,13 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile impleme
         this.metadata = metadata;
     }
 
-    public static class Builder extends SegmentedFile.Builder
+    public static class Builder extends CompressedSegmentedFile.Builder
     {
+        public Builder(CompressedSequentialWriter writer)
+        {
+            super(writer);
+        }
+
         public void addPotentialBoundary(long boundary)
         {
             // only one segment in a standard-io file
@@ -39,7 +45,12 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile impleme
 
         public SegmentedFile complete(String path)
         {
-            return new CompressedPoolingSegmentedFile(path, CompressionMetadata.create(path));
+            return new CompressedPoolingSegmentedFile(path, metadata(path, false));
+        }
+
+        public SegmentedFile openEarly(String path)
+        {
+            return new CompressedPoolingSegmentedFile(path, metadata(path, true));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index d0ea3fd..4afe0a0 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.io.util;
 
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 
 public class CompressedSegmentedFile extends SegmentedFile implements ICompressedFile
@@ -32,14 +33,35 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse
 
     public static class Builder extends SegmentedFile.Builder
     {
+        protected final CompressedSequentialWriter writer;
+        public Builder(CompressedSequentialWriter writer)
+        {
+            this.writer = writer;
+        }
+
         public void addPotentialBoundary(long boundary)
         {
             // only one segment in a standard-io file
         }
 
+        protected CompressionMetadata metadata(String path, boolean early)
+        {
+            if (writer == null)
+                return CompressionMetadata.create(path);
+            else if (early)
+                return writer.openEarly();
+            else
+                return writer.openAfterClose();
+        }
+
         public SegmentedFile complete(String path)
         {
-            return new CompressedSegmentedFile(path, CompressionMetadata.create(path));
+            return new CompressedSegmentedFile(path, metadata(path, false));
+        }
+
+        public SegmentedFile openEarly(String path)
+        {
+            return new CompressedSegmentedFile(path, metadata(path, true));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 15d890c..875c9d5 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -74,6 +74,11 @@ public class FileUtils
         canCleanDirectBuffers = canClean;
     }
 
+    public static void createHardLink(String from, String to)
+    {
+        createHardLink(new File(from), new File(to));
+    }
+
     public static void createHardLink(File from, File to)
     {
         if (to.exists())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/Memory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/Memory.java b/src/java/org/apache/cassandra/io/util/Memory.java
index 278e0f6..b8a46bc 100644
--- a/src/java/org/apache/cassandra/io/util/Memory.java
+++ b/src/java/org/apache/cassandra/io/util/Memory.java
@@ -21,9 +21,9 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
 import com.sun.jna.Native;
-import com.sun.jna.Pointer;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
 
 /**
  * An off-heap region of memory that must be manually free'd when no longer needed.
@@ -145,6 +145,24 @@ public class Memory
         }
     }
 
+    public void setBytes(long memoryOffset, ByteBuffer buffer)
+    {
+        if (buffer == null)
+            throw new NullPointerException();
+        else if (buffer.remaining() == 0)
+            return;
+        checkPosition(memoryOffset + buffer.remaining());
+        if (buffer.hasArray())
+        {
+            setBytes(memoryOffset, buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
+        }
+        else if (buffer instanceof DirectBuffer)
+        {
+            unsafe.copyMemory(((DirectBuffer) buffer).address() + buffer.position(), peer + memoryOffset, buffer.remaining());
+        }
+        else
+            throw new IllegalStateException();
+    }
     /**
      * Transfers count bytes from buffer to Memory
      *
@@ -263,6 +281,18 @@ public class Memory
         assert offset >= 0 && offset < size : "Illegal offset: " + offset + ", size: " + size;
     }
 
+    public void put(long trgOffset, Memory memory, long srcOffset, long size)
+    {
+        unsafe.copyMemory(memory.peer + srcOffset, peer + trgOffset, size);
+    }
+
+    public Memory copy(long newSize)
+    {
+        Memory copy = Memory.allocate(newSize);
+        copy.put(0, this, 0, Math.min(size(), newSize));
+        return copy;
+    }
+
     public void free()
     {
         assert peer != 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index 39a4160..450553b 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -161,28 +161,36 @@ public class MmappedSegmentedFile extends SegmentedFile
         public SegmentedFile complete(String path)
         {
             long length = new File(path).length();
-            // add a sentinel value == length
-            if (length != boundaries.get(boundaries.size() - 1))
-                boundaries.add(length);
             // create the segments
             return new MmappedSegmentedFile(path, length, createSegments(path));
         }
 
+        public SegmentedFile openEarly(String path)
+        {
+            return complete(path);
+        }
+
         private Segment[] createSegments(String path)
         {
-            int segcount = boundaries.size() - 1;
-            Segment[] segments = new Segment[segcount];
             RandomAccessFile raf;
-
+            long length;
             try
             {
                 raf = new RandomAccessFile(path, "r");
+                length = raf.length();
             }
-            catch (FileNotFoundException e)
+            catch (IOException e)
             {
                 throw new RuntimeException(e);
             }
 
+            // add a sentinel value == length
+            List<Long> boundaries = new ArrayList<>(this.boundaries);
+            if (length != boundaries.get(boundaries.size() - 1))
+                boundaries.add(length);
+            int segcount = boundaries.size() - 1;
+            Segment[] segments = new Segment[segcount];
+
             try
             {
                 for (int i = 0; i < segcount; i++)
@@ -221,7 +229,7 @@ public class MmappedSegmentedFile extends SegmentedFile
             super.deserializeBounds(in);
 
             int size = in.readInt();
-            List<Long> temp = new ArrayList<Long>(size);
+            List<Long> temp = new ArrayList<>(size);
             
             for (int i = 0; i < size; i++)
                 temp.add(in.readLong());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
index 892611c..01f4e31 100644
--- a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
@@ -21,6 +21,7 @@ import org.apache.cassandra.service.FileCacheService;
 
 public abstract class PoolingSegmentedFile extends SegmentedFile
 {
+    final FileCacheService.CacheKey cacheKey = new FileCacheService.CacheKey();
     protected PoolingSegmentedFile(String path, long length)
     {
         super(path, length);
@@ -33,7 +34,7 @@ public abstract class PoolingSegmentedFile extends SegmentedFile
 
     public FileDataInput getSegment(long position)
     {
-        RandomAccessReader reader = FileCacheService.instance.get(path);
+        RandomAccessReader reader = FileCacheService.instance.get(cacheKey);
 
         if (reader == null)
             reader = createReader(path);
@@ -46,11 +47,11 @@ public abstract class PoolingSegmentedFile extends SegmentedFile
 
     public void recycle(RandomAccessReader reader)
     {
-        FileCacheService.instance.put(reader);
+        FileCacheService.instance.put(cacheKey, reader);
     }
 
     public void cleanup()
     {
-        FileCacheService.instance.invalidate(path);
+        FileCacheService.instance.invalidate(cacheKey, path);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index d4da177..be549a6 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -28,6 +28,7 @@ import java.util.NoSuchElementException;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -75,7 +76,12 @@ public abstract class SegmentedFile
 
     public static Builder getCompressedBuilder()
     {
-        return new CompressedPoolingSegmentedFile.Builder();
+        return getCompressedBuilder(null);
+    }
+
+    public static Builder getCompressedBuilder(CompressedSequentialWriter writer)
+    {
+        return new CompressedPoolingSegmentedFile.Builder(writer);
     }
 
     public abstract FileDataInput getSegment(long position);
@@ -111,6 +117,12 @@ public abstract class SegmentedFile
          */
         public abstract SegmentedFile complete(String path);
 
+        /**
+         * Called after all potential boundaries have been added to apply this Builder to a concrete file on disk.
+         * @param path The file on disk.
+         */
+        public abstract SegmentedFile openEarly(String path);
+
         public void serializeBounds(DataOutput out) throws IOException
         {
             out.writeUTF(DatabaseDescriptor.getDiskAccessMode().name());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 1d8b95e..7a7eb63 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -45,7 +45,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
     private final String filePath;
 
     protected byte[] buffer;
-    private final boolean skipIOCache;
     private final int fd;
     private final int directoryFD;
     // directory should be synced only after first file sync, in other words, only once per file
@@ -56,9 +55,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
 
     protected final RandomAccessFile out;
 
-    // used if skip I/O cache was enabled
-    private long ioCacheStartOffset = 0, bytesSinceCacheFlush = 0;
-
     // whether to do trickling fsync() to avoid sudden bursts of dirty buffer flushing by kernel causing read
     // latency spikes
     private boolean trickleFsync;
@@ -66,8 +62,9 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
     private int bytesSinceTrickleFsync = 0;
 
     public final DataOutputPlus stream;
+    protected long lastFlushOffset;
 
-    public SequentialWriter(File file, int bufferSize, boolean skipIOCache)
+    public SequentialWriter(File file, int bufferSize)
     {
         try
         {
@@ -81,7 +78,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
         filePath = file.getAbsolutePath();
 
         buffer = new byte[bufferSize];
-        this.skipIOCache = skipIOCache;
         this.trickleFsync = DatabaseDescriptor.getTrickleFsync();
         this.trickleFsyncByteInterval = DatabaseDescriptor.getTrickleFsyncIntervalInKb() * 1024;
 
@@ -100,31 +96,25 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
 
     public static SequentialWriter open(File file)
     {
-        return open(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, false);
-    }
-
-    public static SequentialWriter open(File file, boolean skipIOCache)
-    {
-        return open(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, skipIOCache);
+        return open(file, RandomAccessReader.DEFAULT_BUFFER_SIZE);
     }
 
-    public static SequentialWriter open(File file, int bufferSize, boolean skipIOCache)
+    public static SequentialWriter open(File file, int bufferSize)
     {
-        return new SequentialWriter(file, bufferSize, skipIOCache);
+        return new SequentialWriter(file, bufferSize);
     }
 
-    public static ChecksummedSequentialWriter open(File file, boolean skipIOCache, File crcPath)
+    public static ChecksummedSequentialWriter open(File file, File crcPath)
     {
-        return new ChecksummedSequentialWriter(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, skipIOCache, crcPath);
+        return new ChecksummedSequentialWriter(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, crcPath);
     }
 
     public static CompressedSequentialWriter open(String dataFilePath,
                                                   String offsetsPath,
-                                                  boolean skipIOCache,
                                                   CompressionParameters parameters,
                                                   MetadataCollector sstableMetadataCollector)
     {
-        return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, skipIOCache, parameters, sstableMetadataCollector);
+        return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, parameters, sstableMetadataCollector);
     }
 
     public void write(int value) throws ClosedChannelException
@@ -302,23 +292,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
                 }
             }
 
-            if (skipIOCache)
-            {
-                // we don't know when the data reaches disk since we aren't
-                // calling flush
-                // so we continue to clear pages we don't need from the first
-                // offset we see
-                // periodically we update this starting offset
-                bytesSinceCacheFlush += validBufferBytes;
-
-                if (bytesSinceCacheFlush >= RandomAccessReader.CACHE_FLUSH_INTERVAL_IN_BYTES)
-                {
-                    CLibrary.trySkipCache(this.fd, ioCacheStartOffset, 0);
-                    ioCacheStartOffset = bufferOffset;
-                    bytesSinceCacheFlush = 0;
-                }
-            }
-
             // Remember that we wrote, so we don't write it again on next flush().
             resetBuffer();
 
@@ -335,6 +308,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
         try
         {
             out.write(buffer, 0, validBufferBytes);
+            lastFlushOffset += validBufferBytes;
         }
         catch (IOException e)
         {
@@ -431,6 +405,11 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
         resetBuffer();
     }
 
+    public long getLastFlushOffset()
+    {
+        return lastFlushOffset;
+    }
+
     public void truncate(long toSize)
     {
         try
@@ -458,9 +437,6 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
 
         buffer = null;
 
-        if (skipIOCache && bytesSinceCacheFlush > 0)
-            CLibrary.trySkipCache(fd, 0, 0);
-
         try
         {
             out.close();