You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/12/19 15:31:24 UTC

[1/3] cassandra git commit: Fix regression in SSTableRewriter causing some rows to become unreadable during compaction

Repository: cassandra
Updated Branches:
  refs/heads/trunk 7783f130c -> 9a51c3c3a


Fix regression in SSTableRewriter causing some rows to become unreadable during compaction

patch by marcus and benedict for CASSANDRA-8429


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/871f0039
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/871f0039
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/871f0039

Branch: refs/heads/trunk
Commit: 871f0039c5bf89be343039478c64ce835b04b5cf
Parents: bedd97f
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Dec 19 14:04:38 2014 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Dec 19 14:24:47 2014 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../db/compaction/CompactionManager.java        |   3 -
 .../io/compress/CompressedSequentialWriter.java |  12 +-
 .../io/compress/CompressionMetadata.java        |  31 +--
 .../cassandra/io/sstable/SSTableReader.java     |  26 +--
 .../cassandra/io/sstable/SSTableRewriter.java   | 189 +++++++++----------
 .../cassandra/io/sstable/SSTableWriter.java     | 118 +++++++-----
 .../io/util/BufferedPoolingSegmentedFile.java   |   9 +-
 .../io/util/BufferedSegmentedFile.java          |   9 +-
 .../io/util/CompressedPoolingSegmentedFile.java |  11 +-
 .../io/util/CompressedSegmentedFile.java        |  16 +-
 .../cassandra/io/util/MmappedSegmentedFile.java |   8 +-
 .../apache/cassandra/io/util/SegmentedFile.java |  12 +-
 .../io/sstable/SSTableRewriterTest.java         |  71 ++++++-
 14 files changed, 278 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e5a8f05..ac28d78 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 2.1.3
+ * Fix regression in SSTableRewriter causing some rows to become unreadable 
+   during compaction (CASSANDRA-8429)
  * Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510)
  * (cqlsh) Fix compression options in DESCRIBE TABLE output when compression
    is disabled (CASSANDRA-8288)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 9f5951c..872ebed 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1035,9 +1035,6 @@ public class CompactionManager implements CompactionManagerMBean
                         unrepairedKeyCount++;
                     }
                 }
-                // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
-                // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
-                // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
                 anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
                 anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
                 cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index e533b1e..d3c41fa 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.DataIntegrityMetadata;
 import org.apache.cassandra.io.util.FileMark;
@@ -139,15 +140,10 @@ public class CompressedSequentialWriter extends SequentialWriter
         chunkOffset += compressedLength + 4;
     }
 
-    public CompressionMetadata openEarly()
+    public CompressionMetadata open(SSTableWriter.FinishType finishType)
     {
-        return metadataWriter.openEarly(originalSize, chunkOffset);
-    }
-
-    public CompressionMetadata openAfterClose()
-    {
-        assert current == originalSize;
-        return metadataWriter.openAfterClose(current, chunkOffset);
+        assert finishType != SSTableWriter.FinishType.NORMAL || current == originalSize;
+        return metadataWriter.open(originalSize, chunkOffset, finishType);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index c922963..173722f 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -47,10 +47,10 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.Memory;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -217,7 +217,7 @@ public class CompressionMetadata
             throw new CorruptSSTableException(new EOFException(), indexFilePath);
 
         long chunkOffset = chunkOffsets.getLong(idx);
-        long nextChunkOffset = (idx + 8 == chunkOffsets.size())
+        long nextChunkOffset = (idx + 8 == chunkOffsetsSize)
                                 ? compressedFileLength
                                 : chunkOffsets.getLong(idx + 8);
 
@@ -319,18 +319,25 @@ public class CompressionMetadata
             }
         }
 
-        public CompressionMetadata openEarly(long dataLength, long compressedLength)
+        public CompressionMetadata open(long dataLength, long compressedLength, SSTableWriter.FinishType finishType)
         {
+            RefCountedMemory offsets;
+            switch (finishType)
+            {
+                case EARLY:
+                    offsets = this.offsets;
+                    break;
+                case NORMAL:
+                case FINISH_EARLY:
+                    offsets = this.offsets.copy(count * 8L);
+                    this.offsets.unreference();
+                    break;
+                default:
+                    throw new AssertionError();
+            }
             return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums);
         }
 
-        public CompressionMetadata openAfterClose(long dataLength, long compressedLength)
-        {
-            RefCountedMemory newOffsets = offsets.copy(count * 8L);
-            offsets.unreference();
-            return new CompressionMetadata(filePath, parameters, newOffsets, count * 8L, dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums);
-        }
-
         /**
          * Get a chunk offset by it's index.
          *
@@ -360,8 +367,8 @@ public class CompressionMetadata
             	out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filePath)));
 	            assert chunks == count;
 	            writeHeader(out, dataLength, chunks);
-	            for (int i = 0 ; i < count ; i++)
-	                out.writeLong(offsets.getLong(i * 8));
+                for (int i = 0 ; i < count ; i++)
+                    out.writeLong(offsets.getLong(i * 8));
             }
             finally
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index bd20226..217a109 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -201,7 +201,6 @@ public class SSTableReader extends SSTable
     private Object replaceLock = new Object();
     private SSTableReader replacedBy;
     private SSTableReader replaces;
-    private SSTableReader sharesBfWith;
     private SSTableDeletingTask deletingTask;
     private Runnable runOnClose;
 
@@ -575,7 +574,7 @@ public class SSTableReader extends SSTable
 
         synchronized (replaceLock)
         {
-            boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = false;
+            boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = isCompacted.get();
 
             if (replacedBy != null)
             {
@@ -594,19 +593,11 @@ public class SSTableReader extends SSTable
                 deleteFiles &= !dfile.path.equals(replaces.dfile.path);
             }
 
-            if (sharesBfWith != null)
-            {
-                closeBf &= sharesBfWith.bf != bf;
-                closeSummary &= sharesBfWith.indexSummary != indexSummary;
-                closeFiles &= sharesBfWith.dfile != dfile;
-                deleteFiles &= !dfile.path.equals(sharesBfWith.dfile.path);
-            }
-
             boolean deleteAll = false;
             if (release && isCompacted.get())
             {
                 assert replacedBy == null;
-                if (replaces != null)
+                if (replaces != null && !deleteFiles)
                 {
                     replaces.replacedBy = null;
                     replaces.deletingTask = deletingTask;
@@ -936,19 +927,6 @@ public class SSTableReader extends SSTable
         }
     }
 
-    /**
-     * this is used to avoid closing the bloom filter multiple times when finishing an SSTableRewriter
-     *
-     * note that the reason we don't use replacedBy is that we are not yet actually replaced
-     *
-     * @param newReader
-     */
-    public void sharesBfWith(SSTableReader newReader)
-    {
-        assert openReason.equals(OpenReason.EARLY);
-        this.sharesBfWith = newReader;
-    }
-
     public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
     {
         synchronized (replaceLock)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index cd9435d..43ac4b6 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -17,18 +17,11 @@
  */
 package org.apache.cassandra.io.sstable;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
-import com.google.common.collect.Lists;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -38,7 +31,6 @@ import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
 
 /**
  * Wraps one or more writers as output for rewriting one or more readers: every sstable_preemptive_open_interval_in_mb
@@ -84,8 +76,10 @@ public class SSTableRewriter
     private SSTableReader currentlyOpenedEarly; // the reader for the most recent (re)opening of the target file
     private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at
 
-    private final List<SSTableReader> finishedOpenedEarly = new ArrayList<>(); // the 'finished' tmplink sstables
-    private final List<Pair<SSTableWriter, SSTableReader>> finishedWriters = new ArrayList<>();
+    private final Queue<Finished> finishedEarly = new ArrayDeque<>();
+    // as writers are closed from finishedEarly, their last readers are moved
+    // into discard, so that abort can cleanup after us safely
+    private final List<SSTableReader> discard = new ArrayList<>();
     private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of DataTracker)
 
     private SSTableWriter writer;
@@ -183,42 +177,30 @@ public class SSTableRewriter
     public void abort()
     {
         switchWriter(null);
-
         moveStarts(null, Functions.forMap(originalStarts), true);
 
-        List<SSTableReader> close = Lists.newArrayList(finishedOpenedEarly);
-
-        for (Pair<SSTableWriter, SSTableReader> w : finishedWriters)
-        {
-            // we should close the bloom filter if we have not opened an sstable reader from this
-            // writer (it will get closed when we release the sstable reference below):
-            w.left.abort(w.right == null);
-            if (isOffline && w.right != null)
-            {
-                // the pairs get removed from finishedWriters when they are closedAndOpened in finish(), the ones left need to be removed here:
-                w.right.markObsolete();
-                w.right.releaseReference();
-            }
-        }
-
-        // also remove already completed SSTables
-        for (SSTableReader sstable : close)
-            sstable.markObsolete();
-
+        // remove already completed SSTables
         for (SSTableReader sstable : finished)
         {
             sstable.markObsolete();
             sstable.releaseReference();
         }
 
-        // releases reference in replaceReaders
-        if (!isOffline)
+        // abort the writers
+        for (Finished finished : finishedEarly)
         {
-            dataTracker.replaceEarlyOpenedFiles(close, Collections.<SSTableReader>emptyList());
-            dataTracker.unmarkCompacting(close);
+            boolean opened = finished.reader != null;
+            finished.writer.abort(!opened);
+            if (opened)
+            {
+                // if we've already been opened, add ourselves to the discard pile
+                discard.add(finished.reader);
+                finished.reader.markObsolete();
+            }
         }
-    }
 
+        replaceWithFinishedReaders(Collections.<SSTableReader>emptyList());
+    }
 
     /**
      * Replace the readers we are rewriting with cloneWithNewStart, reclaiming any page cache that is no longer
@@ -274,8 +256,6 @@ public class SSTableRewriter
         rewriting.addAll(replaceWith);
     }
 
-
-
     private void replaceEarlyOpenedFile(SSTableReader toReplace, SSTableReader replaceWith)
     {
         if (isOffline)
@@ -301,15 +281,19 @@ public class SSTableRewriter
             writer = newWriter;
             return;
         }
-        // we leave it as a tmp file, but we open it early and add it to the dataTracker
-        SSTableReader reader = writer.openEarly(maxAge);
-        if (reader != null)
+
+        // we leave it as a tmp file, but we open it and add it to the dataTracker
+        if (writer.getFilePointer() != 0)
         {
-            finishedOpenedEarly.add(reader);
+            SSTableReader reader = writer.finish(SSTableWriter.FinishType.EARLY, maxAge, -1);
             replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
             moveStarts(reader, Functions.constant(reader.last), false);
+            finishedEarly.add(new Finished(writer, reader));
+        }
+        else
+        {
+            writer.abort();
         }
-        finishedWriters.add(Pair.create(writer, reader));
         currentlyOpenedEarly = null;
         currentlyOpenedEarlyAt = 0;
         writer = newWriter;
@@ -337,85 +321,82 @@ public class SSTableRewriter
      */
     public List<SSTableReader> finish(long repairedAt)
     {
-        List<Pair<SSTableReader, SSTableReader>> toReplace = new ArrayList<>();
+        return finishAndMaybeThrow(repairedAt, false, false);
+    }
+
+    @VisibleForTesting
+    void finishAndThrow(boolean throwEarly)
+    {
+        finishAndMaybeThrow(-1, throwEarly, !throwEarly);
+    }
+
+    private List<SSTableReader> finishAndMaybeThrow(long repairedAt, boolean throwEarly, boolean throwLate)
+    {
+        List<SSTableReader> newReaders = new ArrayList<>();
         switchWriter(null);
-        // make real sstables of the written ones:
-        Iterator<Pair<SSTableWriter, SSTableReader>> it = finishedWriters.iterator();
-        while(it.hasNext())
+
+        if (throwEarly)
+            throw new RuntimeException("exception thrown early in finish, for testing");
+
+        while (!finishedEarly.isEmpty())
         {
-            Pair<SSTableWriter, SSTableReader> w = it.next();
-            if (w.left.getFilePointer() > 0)
+            Finished f = finishedEarly.poll();
+            if (f.writer.getFilePointer() > 0)
             {
-                SSTableReader newReader = repairedAt < 0 ? w.left.closeAndOpenReader(maxAge) : w.left.closeAndOpenReader(maxAge, repairedAt);
-                finished.add(newReader);
+                if (f.reader != null)
+                    discard.add(f.reader);
 
-                if (w.right != null)
-                {
-                    w.right.sharesBfWith(newReader);
-                    if (isOffline)
-                    {
-                        // remove the tmplink files if we are offline - no one is using them
-                        w.right.markObsolete();
-                        w.right.releaseReference();
-                    }
-                }
-                // w.right is the tmplink-reader we added when switching writer, replace with the real sstable.
-                toReplace.add(Pair.create(w.right, newReader));
+                SSTableReader newReader = f.writer.finish(SSTableWriter.FinishType.FINISH_EARLY, maxAge, repairedAt);
+
+                if (f.reader != null)
+                    f.reader.setReplacedBy(newReader);
+
+                finished.add(newReader);
+                newReaders.add(newReader);
             }
             else
             {
-                assert w.right == null;
-                w.left.abort(true);
+                f.writer.abort(true);
+                assert f.reader == null;
             }
-            it.remove();
         }
 
-        if (!isOffline)
-        {
-            for (Pair<SSTableReader, SSTableReader> replace : toReplace)
-                replaceEarlyOpenedFile(replace.left, replace.right);
-            dataTracker.unmarkCompacting(finished);
-        }
+        if (throwLate)
+            throw new RuntimeException("exception thrown after all sstables finished, for testing");
+
+        replaceWithFinishedReaders(newReaders);
         return finished;
     }
 
-    @VisibleForTesting
-    void finishAndThrow(boolean early)
+    // cleanup all our temporary readers and swap in our new ones
+    private void replaceWithFinishedReaders(List<SSTableReader> finished)
     {
-        List<Pair<SSTableReader, SSTableReader>> toReplace = new ArrayList<>();
-        switchWriter(null);
-        if (early)
-            throw new RuntimeException("exception thrown early in finish");
-        // make real sstables of the written ones:
-        Iterator<Pair<SSTableWriter, SSTableReader>> it = finishedWriters.iterator();
-        while(it.hasNext())
+        if (isOffline)
         {
-            Pair<SSTableWriter, SSTableReader> w = it.next();
-            if (w.left.getFilePointer() > 0)
-            {
-                SSTableReader newReader = w.left.closeAndOpenReader(maxAge);
-                finished.add(newReader);
-
-                if (w.right != null)
-                {
-                    w.right.sharesBfWith(newReader);
-                    if (isOffline)
-                    {
-                        w.right.markObsolete();
-                        w.right.releaseReference();
-                    }
-                }
-                // w.right is the tmplink-reader we added when switching writer, replace with the real sstable.
-                toReplace.add(Pair.create(w.right, newReader));
-            }
-            else
+            for (SSTableReader reader : discard)
             {
-                assert w.right == null;
-                w.left.abort(true);
+                if (reader.getCurrentReplacement() == null)
+                    reader.markObsolete();
+                reader.releaseReference();
             }
-            it.remove();
         }
+        else
+        {
+            dataTracker.replaceEarlyOpenedFiles(discard, finished);
+            dataTracker.unmarkCompacting(discard);
+        }
+        discard.clear();
+    }
 
-        throw new RuntimeException("exception thrown after all sstables finished");
+    private static final class Finished
+    {
+        final SSTableWriter writer;
+        final SSTableReader reader;
+
+        private Finished(SSTableWriter writer, SSTableReader reader)
+        {
+            this.writer = writer;
+            this.reader = reader;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index ec64561..595012d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -380,6 +380,18 @@ public class SSTableWriter extends SSTable
         last = lastWrittenKey = getMinimalKey(last);
     }
 
+    private Descriptor makeTmpLinks()
+    {
+        // create temp links if they don't already exist
+        Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
+        if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
+        {
+            FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX)));
+            FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA)));
+        }
+        return link;
+    }
+
     public SSTableReader openEarly(long maxDataAge)
     {
         StatsMetadata sstableMetadata = (StatsMetadata) sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
@@ -391,17 +403,10 @@ public class SSTableWriter extends SSTable
         if (exclusiveUpperBoundOfReadableIndex == null)
             return null;
 
-        // create temp links if they don't already exist
-        Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
-        if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
-        {
-            FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX)));
-            FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA)));
-        }
-
+        Descriptor link = makeTmpLinks();
         // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers
-        SegmentedFile ifile = iwriter.builder.openEarly(link.filenameFor(Component.PRIMARY_INDEX));
-        SegmentedFile dfile = dbuilder.openEarly(link.filenameFor(Component.DATA));
+        SegmentedFile ifile = iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), FinishType.EARLY);
+        SegmentedFile dfile = dbuilder.complete(link.filenameFor(Component.DATA), FinishType.EARLY);
         SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
                                                            components, metadata,
                                                            partitioner, ifile,
@@ -435,6 +440,19 @@ public class SSTableWriter extends SSTable
         return sstable;
     }
 
+    public static enum FinishType
+    {
+        NORMAL(SSTableReader.OpenReason.NORMAL),
+        EARLY(SSTableReader.OpenReason.EARLY), // no renaming
+        FINISH_EARLY(SSTableReader.OpenReason.NORMAL); // tidy up an EARLY finish
+        final SSTableReader.OpenReason openReason;
+
+        FinishType(SSTableReader.OpenReason openReason)
+        {
+            this.openReason = openReason;
+        }
+    }
+
     public SSTableReader closeAndOpenReader()
     {
         return closeAndOpenReader(System.currentTimeMillis());
@@ -442,68 +460,84 @@ public class SSTableWriter extends SSTable
 
     public SSTableReader closeAndOpenReader(long maxDataAge)
     {
-        return closeAndOpenReader(maxDataAge, this.repairedAt);
+        return finish(FinishType.NORMAL, maxDataAge, this.repairedAt);
     }
 
-    public SSTableReader closeAndOpenReader(long maxDataAge, long repairedAt)
+    public SSTableReader finish(FinishType finishType, long maxDataAge, long repairedAt)
     {
-        Pair<Descriptor, StatsMetadata> p = close(repairedAt);
-        Descriptor newdesc = p.left;
-        StatsMetadata sstableMetadata = p.right;
+        Pair<Descriptor, StatsMetadata> p;
+
+        p = close(finishType, repairedAt);
+        Descriptor desc = p.left;
+        StatsMetadata metadata = p.right;
+
+        if (finishType == FinishType.EARLY)
+            desc = makeTmpLinks();
 
         // finalize in-memory state for the reader
-        SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(Component.PRIMARY_INDEX));
-        SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(Component.DATA));
-        SSTableReader sstable = SSTableReader.internalOpen(newdesc,
+        SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX), finishType);
+        SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA), finishType);
+        SSTableReader sstable = SSTableReader.internalOpen(desc.asType(Descriptor.Type.FINAL),
                                                            components,
-                                                           metadata,
+                                                           this.metadata,
                                                            partitioner,
                                                            ifile,
                                                            dfile,
                                                            iwriter.summary.build(partitioner),
                                                            iwriter.bf,
                                                            maxDataAge,
-                                                           sstableMetadata,
-                                                           SSTableReader.OpenReason.NORMAL);
+                                                           metadata,
+                                                           finishType.openReason);
         sstable.first = getMinimalKey(first);
         sstable.last = getMinimalKey(last);
-        // try to save the summaries to disk
-        sstable.saveSummary(iwriter.builder, dbuilder);
-        iwriter = null;
-        dbuilder = null;
+
+        switch (finishType)
+        {
+            case NORMAL: case FINISH_EARLY:
+            // try to save the summaries to disk
+            sstable.saveSummary(iwriter.builder, dbuilder);
+            iwriter = null;
+            dbuilder = null;
+        }
         return sstable;
     }
 
     // Close the writer and return the descriptor to the new sstable and it's metadata
     public Pair<Descriptor, StatsMetadata> close()
     {
-        return close(this.repairedAt);
+        return close(FinishType.NORMAL, this.repairedAt);
     }
 
-    private Pair<Descriptor, StatsMetadata> close(long repairedAt)
+    private Pair<Descriptor, StatsMetadata> close(FinishType type, long repairedAt)
     {
+        switch (type)
+        {
+            case EARLY: case NORMAL:
+            iwriter.close();
+            dataFile.close();
+        }
 
-        // index and filter
-        iwriter.close();
-        // main data, close will truncate if necessary
-        dataFile.close();
-        dataFile.writeFullChecksum(descriptor);
         // write sstable statistics
-        Map<MetadataType, MetadataComponent> metadataComponents = sstableMetadataCollector.finalizeMetadata(
-                                                                                    partitioner.getClass().getCanonicalName(),
-                                                                                    metadata.getBloomFilterFpChance(),
-                                                                                    repairedAt);
-        writeMetadata(descriptor, metadataComponents);
-
-        // save the table of components
-        SSTable.appendTOC(descriptor, components);
+        Map<MetadataType, MetadataComponent> metadataComponents ;
+        metadataComponents = sstableMetadataCollector
+                             .finalizeMetadata(partitioner.getClass().getCanonicalName(),
+                                               metadata.getBloomFilterFpChance(),repairedAt);
 
         // remove the 'tmp' marker from all components
-        return Pair.create(rename(descriptor, components), (StatsMetadata) metadataComponents.get(MetadataType.STATS));
+        Descriptor descriptor = this.descriptor;
+        switch (type)
+        {
+            case NORMAL: case FINISH_EARLY:
+            dataFile.writeFullChecksum(descriptor);
+            writeMetadata(descriptor, metadataComponents);
+            // save the table of components
+            SSTable.appendTOC(descriptor, components);
+            descriptor = rename(descriptor, components);
+        }
 
+        return Pair.create(descriptor, (StatsMetadata) metadataComponents.get(MetadataType.STATS));
     }
 
-
     private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components)
     {
         SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
index b284f61..57f465f 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.io.util;
 
 import java.io.File;
 
+import org.apache.cassandra.io.sstable.SSTableWriter;
+
 public class BufferedPoolingSegmentedFile extends PoolingSegmentedFile
 {
     public BufferedPoolingSegmentedFile(String path, long length)
@@ -33,16 +35,11 @@ public class BufferedPoolingSegmentedFile extends PoolingSegmentedFile
             // only one segment in a standard-io file
         }
 
-        public SegmentedFile complete(String path)
+        public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
         {
             long length = new File(path).length();
             return new BufferedPoolingSegmentedFile(path, length);
         }
-
-        public SegmentedFile openEarly(String path)
-        {
-            return complete(path);
-        }
     }
 
     protected RandomAccessReader createReader(String path)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
index aa031e3..2f715da 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.io.util;
 
 import java.io.File;
 
+import org.apache.cassandra.io.sstable.SSTableWriter;
+
 public class BufferedSegmentedFile extends SegmentedFile
 {
     public BufferedSegmentedFile(String path, long length)
@@ -33,16 +35,11 @@ public class BufferedSegmentedFile extends SegmentedFile
             // only one segment in a standard-io file
         }
 
-        public SegmentedFile complete(String path)
+        public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
         {
             long length = new File(path).length();
             return new BufferedSegmentedFile(path, length);
         }
-
-        public SegmentedFile openEarly(String path)
-        {
-            return complete(path);
-        }
     }
 
     public FileDataInput getSegment(long position)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
index 1803e69..11d091a 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.io.util;
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 
 public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile implements ICompressedFile
 {
@@ -43,17 +44,11 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile impleme
             // only one segment in a standard-io file
         }
 
-        public SegmentedFile complete(String path)
+        public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
         {
-            return new CompressedPoolingSegmentedFile(path, metadata(path, false));
-        }
-
-        public SegmentedFile openEarly(String path)
-        {
-            return new CompressedPoolingSegmentedFile(path, metadata(path, true));
+            return new CompressedPoolingSegmentedFile(path, metadata(path, finishType));
         }
     }
-
     protected RandomAccessReader createReader(String path)
     {
         return CompressedRandomAccessReader.open(path, metadata, this);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index 4afe0a0..b788715 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.io.util;
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 
 public class CompressedSegmentedFile extends SegmentedFile implements ICompressedFile
 {
@@ -44,24 +45,17 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse
             // only one segment in a standard-io file
         }
 
-        protected CompressionMetadata metadata(String path, boolean early)
+        protected CompressionMetadata metadata(String path, SSTableWriter.FinishType finishType)
         {
             if (writer == null)
                 return CompressionMetadata.create(path);
-            else if (early)
-                return writer.openEarly();
-            else
-                return writer.openAfterClose();
-        }
 
-        public SegmentedFile complete(String path)
-        {
-            return new CompressedSegmentedFile(path, metadata(path, false));
+            return writer.open(finishType);
         }
 
-        public SegmentedFile openEarly(String path)
+        public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
         {
-            return new CompressedSegmentedFile(path, metadata(path, true));
+            return new CompressedSegmentedFile(path, metadata(path, finishType));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index ccc03fc..3b2cc98 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 public class MmappedSegmentedFile extends SegmentedFile
@@ -160,18 +161,13 @@ public class MmappedSegmentedFile extends SegmentedFile
             }
         }
 
-        public SegmentedFile complete(String path)
+        public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
         {
             long length = new File(path).length();
             // create the segments
             return new MmappedSegmentedFile(path, length, createSegments(path));
         }
 
-        public SegmentedFile openEarly(String path)
-        {
-            return complete(path);
-        }
-
         private Segment[] createSegments(String path)
         {
             RandomAccessFile raf;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index be549a6..badae56 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -115,13 +116,12 @@ public abstract class SegmentedFile
          * Called after all potential boundaries have been added to apply this Builder to a concrete file on disk.
          * @param path The file on disk.
          */
-        public abstract SegmentedFile complete(String path);
+        public abstract SegmentedFile complete(String path, SSTableWriter.FinishType openType);
 
-        /**
-         * Called after all potential boundaries have been added to apply this Builder to a concrete file on disk.
-         * @param path The file on disk.
-         */
-        public abstract SegmentedFile openEarly(String path);
+        public SegmentedFile complete(String path)
+        {
+            return complete(path, SSTableWriter.FinishType.NORMAL);
+        }
 
         public void serializeBounds(DataOutput out) throws IOException
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/871f0039/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 6f9acea..392936d 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -483,7 +483,6 @@ public class SSTableRewriterTest extends SchemaLoader
         cfs.disableAutoCompaction();
 
         SSTableReader s = writeFile(cfs, 400);
-        DecoratedKey origFirst = s.first;
         cfs.addSSTable(s);
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(1000000);
@@ -499,8 +498,7 @@ public class SSTableRewriterTest extends SchemaLoader
                 rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
                 if (rewriter.currentWriter().getOnDiskFilePointer() > 2500000)
                 {
-                    assertEquals(1, cfs.getSSTables().size()); // we dont open small files early ...
-                    assertEquals(origFirst, cfs.getSSTables().iterator().next().first); // ... and the first key should stay the same
+                    assertEquals(files, cfs.getSSTables().size()); // all files are now opened early
                     rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
                     files++;
                 }
@@ -616,6 +614,73 @@ public class SSTableRewriterTest extends SchemaLoader
 
     }
 
+    @Test
+    public void testAllKeysReadable() throws Exception
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+        for (int i = 0; i < 100; i++)
+        {
+            DecoratedKey key = Util.dk(Integer.toString(i));
+            Mutation rm = new Mutation(KEYSPACE, key.getKey());
+            for (int j = 0; j < 10; j++)
+                rm.add(CF, Util.cellname(Integer.toString(j)), ByteBufferUtil.EMPTY_BYTE_BUFFER, 100);
+            rm.apply();
+        }
+        cfs.forceBlockingFlush();
+        cfs.forceMajorCompaction();
+        validateKeys(keyspace);
+
+        assertEquals(1, cfs.getSSTables().size());
+        SSTableReader s = cfs.getSSTables().iterator().next();
+        Set<SSTableReader> compacting = new HashSet<>();
+        compacting.add(s);
+        cfs.getDataTracker().markCompacting(compacting);
+
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+        SSTableRewriter.overrideOpenInterval(1);
+        SSTableWriter w = getWriter(cfs, s.descriptor.directory);
+        rewriter.switchWriter(w);
+        int keyCount = 0;
+        try (ICompactionScanner scanner = compacting.iterator().next().getScanner();
+             CompactionController controller = new CompactionController(cfs, compacting, 0))
+        {
+            while (scanner.hasNext())
+            {
+                rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+                if (keyCount % 10 == 0)
+                {
+                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                }
+                keyCount++;
+                validateKeys(keyspace);
+            }
+            try
+            {
+                cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, rewriter.finish(), OperationType.COMPACTION);
+                cfs.getDataTracker().unmarkCompacting(compacting);
+            }
+            catch (Throwable t)
+            {
+                rewriter.abort();
+            }
+        }
+        validateKeys(keyspace);
+        Thread.sleep(1000);
+        validateCFS(cfs);
+    }
+
+    private void validateKeys(Keyspace ks)
+    {
+        for (int i = 0; i < 100; i++)
+        {
+            DecoratedKey key = Util.dk(Integer.toString(i));
+            ColumnFamily cf = Util.getColumnFamily(ks, key, CF);
+            assertTrue(cf != null);
+        }
+    }
+
     private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
     {
         ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);


[3/3] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into trunk

Conflicts:
	src/java/org/apache/cassandra/db/compaction/CompactionManager.java
	src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
	src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9a51c3c3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9a51c3c3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9a51c3c3

Branch: refs/heads/trunk
Commit: 9a51c3c3a891f2a22189b53db48b767b9a0400ca
Parents: 7783f13 871f003
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Dec 19 14:31:06 2014 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Dec 19 14:31:06 2014 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../io/compress/CompressedSequentialWriter.java |  12 +-
 .../io/compress/CompressionMetadata.java        |  30 +--
 .../cassandra/io/sstable/SSTableRewriter.java   | 189 +++++++++----------
 .../io/sstable/format/SSTableReader.java        |  26 +--
 .../io/sstable/format/SSTableWriter.java        |  17 +-
 .../io/sstable/format/big/BigTableWriter.java   | 113 +++++++----
 .../io/util/BufferedPoolingSegmentedFile.java   |   9 +-
 .../io/util/BufferedSegmentedFile.java          |   9 +-
 .../io/util/CompressedPoolingSegmentedFile.java |  11 +-
 .../io/util/CompressedSegmentedFile.java        |  16 +-
 .../cassandra/io/util/MmappedSegmentedFile.java |   8 +-
 .../apache/cassandra/io/util/SegmentedFile.java |  12 +-
 .../io/sstable/SSTableRewriterTest.java         |  77 +++++++-
 14 files changed, 290 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 5ec1bd2,ac28d78..c91f202
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,47 -1,6 +1,49 @@@
 +3.0
 + * Support index key/value entries on map collections (CASSANDRA-8473)
 + * Modernize schema tables (CASSANDRA-8261)
 + * Support for user-defined aggregation functions (CASSANDRA-8053)
 + * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419)
 + * Refactor SelectStatement, return IN results in natural order instead
 +   of IN value list order (CASSANDRA-7981)
 + * Support UDTs, tuples, and collections in user-defined
 +   functions (CASSANDRA-7563)
 + * Fix aggregate fn results on empty selection, result column name,
 +   and cqlsh parsing (CASSANDRA-8229)
 + * Mark sstables as repaired after full repair (CASSANDRA-7586)
 + * Extend Descriptor to include a format value and refactor reader/writer apis (CASSANDRA-7443)
 + * Integrate JMH for microbenchmarks (CASSANDRA-8151)
 + * Keep sstable levels when bootstrapping (CASSANDRA-7460)
 + * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
 + * Support for aggregation functions (CASSANDRA-4914)
 + * Remove cassandra-cli (CASSANDRA-7920)
 + * Accept dollar quoted strings in CQL (CASSANDRA-7769)
 + * Make assassinate a first class command (CASSANDRA-7935)
 + * Support IN clause on any clustering column (CASSANDRA-4762)
 + * Improve compaction logging (CASSANDRA-7818)
 + * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
 + * Do anticompaction in groups (CASSANDRA-6851)
 + * Support pure user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
 +   7924, 7812, 8063, 7813)
 + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
 + * Move sstable RandomAccessReader to nio2, which allows using the
 +   FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
 + * Remove CQL2 (CASSANDRA-5918)
 + * Add Thrift get_multi_slice call (CASSANDRA-6757)
 + * Optimize fetching multiple cells by name (CASSANDRA-6933)
 + * Allow compilation in java 8 (CASSANDRA-7028)
 + * Make incremental repair default (CASSANDRA-7250)
 + * Enable code coverage thru JaCoCo (CASSANDRA-7226)
 + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) 
 + * Shorten SSTable path (CASSANDRA-6962)
 + * Use unsafe mutations for most unit tests (CASSANDRA-6969)
 + * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
 + * Fail on very large batch sizes (CASSANDRA-8011)
 + * Improve concurrency of repair (CASSANDRA-6455, 8208)
 +
 +
  2.1.3
+  * Fix regression in SSTableRewriter causing some rows to become unreadable 
+    during compaction (CASSANDRA-8429)
   * Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510)
   * (cqlsh) Fix compression options in DESCRIBE TABLE output when compression
     is disabled (CASSANDRA-8288)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index e533b1e,d3c41fa..e875ee3
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@@ -27,6 -27,7 +27,7 @@@ import org.apache.cassandra.io.FSReadEr
  import org.apache.cassandra.io.FSWriteError;
  import org.apache.cassandra.io.sstable.CorruptSSTableException;
  import org.apache.cassandra.io.sstable.Descriptor;
 -import org.apache.cassandra.io.sstable.SSTableWriter;
++import org.apache.cassandra.io.sstable.format.SSTableWriter;
  import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
  import org.apache.cassandra.io.util.DataIntegrityMetadata;
  import org.apache.cassandra.io.util.FileMark;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index dfbc50f,173722f..221b3c1
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@@ -48,7 -47,7 +48,8 @@@ import org.apache.cassandra.io.IVersion
  import org.apache.cassandra.io.sstable.Component;
  import org.apache.cassandra.io.sstable.CorruptSSTableException;
  import org.apache.cassandra.io.sstable.Descriptor;
 -import org.apache.cassandra.io.sstable.SSTableWriter;
 +import org.apache.cassandra.io.sstable.format.Version;
++import org.apache.cassandra.io.sstable.format.SSTableWriter;
  import org.apache.cassandra.io.util.DataOutputPlus;
  import org.apache.cassandra.io.util.FileUtils;
  import org.apache.cassandra.io.util.Memory;
@@@ -322,18 -319,25 +323,25 @@@ public class CompressionMetadat
              }
          }
  
-         public CompressionMetadata openEarly(long dataLength, long compressedLength)
+         public CompressionMetadata open(long dataLength, long compressedLength, SSTableWriter.FinishType finishType)
          {
+             RefCountedMemory offsets;
+             switch (finishType)
+             {
+                 case EARLY:
+                     offsets = this.offsets;
+                     break;
+                 case NORMAL:
+                 case FINISH_EARLY:
+                     offsets = this.offsets.copy(count * 8L);
+                     this.offsets.unreference();
+                     break;
+                 default:
+                     throw new AssertionError();
+             }
 -            return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums);
 +            return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength, latestVersion.hasPostCompressionAdlerChecksums());
          }
  
-         public CompressionMetadata openAfterClose(long dataLength, long compressedLength)
-         {
-             RefCountedMemory newOffsets = offsets.copy(count * 8L);
-             offsets.unreference();
-             return new CompressionMetadata(filePath, parameters, newOffsets, count * 8L, dataLength, compressedLength, latestVersion.hasPostCompressionAdlerChecksums());
-         }
- 
          /**
           * Get a chunk offset by it's index.
           *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 8b25f59,43ac4b6..e7a28d3
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@@ -36,11 -29,8 +29,10 @@@ import org.apache.cassandra.db.DataTrac
  import org.apache.cassandra.db.DecoratedKey;
  import org.apache.cassandra.db.RowIndexEntry;
  import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
  import org.apache.cassandra.utils.CLibrary;
  import org.apache.cassandra.utils.FBUtilities;
- import org.apache.cassandra.utils.Pair;
  
  /**
   * Wraps one or more writers as output for rewriting one or more readers: every sstable_preemptive_open_interval_in_mb

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index ff85320,0000000..41898c8
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@@ -1,1900 -1,0 +1,1878 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format;
 +
 +import java.io.*;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Predicate;
 +import com.google.common.collect.Iterators;
 +import com.google.common.collect.Ordering;
 +import com.google.common.primitives.Longs;
 +import com.google.common.util.concurrent.RateLimiter;
 +
 +import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
 +import com.clearspring.analytics.stream.cardinality.ICardinality;
 +import org.apache.cassandra.cache.CachingOptions;
 +import org.apache.cassandra.cache.InstrumentingCache;
 +import org.apache.cassandra.cache.KeyCacheKey;
 +import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 +import org.apache.cassandra.concurrent.ScheduledExecutors;
 +import org.apache.cassandra.config.*;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 +import org.apache.cassandra.db.commitlog.ReplayPosition;
 +import org.apache.cassandra.db.composites.CellName;
 +import org.apache.cassandra.db.filter.ColumnSlice;
 +import org.apache.cassandra.db.index.SecondaryIndex;
 +import org.apache.cassandra.dht.*;
 +import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 +import org.apache.cassandra.io.compress.CompressedThrottledReader;
 +import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.io.sstable.*;
 +import org.apache.cassandra.io.sstable.metadata.*;
 +import org.apache.cassandra.io.util.*;
 +import org.apache.cassandra.metrics.RestorableMeter;
 +import org.apache.cassandra.metrics.StorageMetrics;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.service.CacheService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.*;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
 +
 +/**
 + * SSTableReaders are open()ed by Keyspace.onStart; after that they are created by SSTableWriter.renameAndOpen.
 + * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead.
 + */
 +public abstract class SSTableReader extends SSTable
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
 +
 +    private static final ScheduledThreadPoolExecutor syncExecutor = new ScheduledThreadPoolExecutor(1);
 +    private static final RateLimiter meterSyncThrottle = RateLimiter.create(100.0);
 +
 +    public static final Comparator<SSTableReader> maxTimestampComparator = new Comparator<SSTableReader>()
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            long ts1 = o1.getMaxTimestamp();
 +            long ts2 = o2.getMaxTimestamp();
 +            return (ts1 > ts2 ? -1 : (ts1 == ts2 ? 0 : 1));
 +        }
 +    };
 +
 +    public static final Comparator<SSTableReader> sstableComparator = new Comparator<SSTableReader>()
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            return o1.first.compareTo(o2.first);
 +        }
 +    };
 +
 +    public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator);
 +
 +    /**
 +     * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an upper bound
 +     * to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created
 +     * later than maxDataAge.
 +     *
 +     * The field is not serialized to disk, so relying on it for more than what truncate does is not advised.
 +     *
 +     * When a new sstable is flushed, maxDataAge is set to the time of creation.
 +     * When a sstable is created from compaction, maxDataAge is set to max of all merged sstables.
 +     *
 +     * The age is in milliseconds since epoc and is local to this host.
 +     */
 +    public final long maxDataAge;
 +
 +    public enum OpenReason
 +    {
 +        NORMAL,
 +        EARLY,
 +        METADATA_CHANGE
 +    }
 +
 +    public final OpenReason openReason;
 +
 +    // indexfile and datafile: might be null before a call to load()
 +    protected SegmentedFile ifile;
 +    protected SegmentedFile dfile;
 +
 +    protected IndexSummary indexSummary;
 +    protected IFilter bf;
 +
 +    protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
 +
 +    protected InstrumentingCache<KeyCacheKey, RowIndexEntry> keyCache;
 +
 +    protected final BloomFilterTracker bloomFilterTracker = new BloomFilterTracker();
 +
 +    protected final AtomicInteger references = new AtomicInteger(1);
 +    // technically isCompacted is not necessary since it should never be unreferenced unless it is also compacted,
 +    // but it seems like a good extra layer of protection against reference counting bugs to not delete data based on that alone
 +    protected final AtomicBoolean isCompacted = new AtomicBoolean(false);
 +    protected final AtomicBoolean isSuspect = new AtomicBoolean(false);
 +
 +    // not final since we need to be able to change level on a file.
 +    protected volatile StatsMetadata sstableMetadata;
 +
 +    protected final AtomicLong keyCacheHit = new AtomicLong(0);
 +    protected final AtomicLong keyCacheRequest = new AtomicLong(0);
 +
 +    /**
 +     * To support replacing this sstablereader with another object that represents that same underlying sstable, but with different associated resources,
 +     * we build a linked-list chain of replacement, which we synchronise using a shared object to make maintenance of the list across multiple threads simple.
 +     * On close we check if any of the closeable resources differ between any chains either side of us; any that are in neither of the adjacent links (if any) are closed.
 +     * Once we've made this decision we remove ourselves from the linked list, so that anybody behind/ahead will compare against only other still opened resources.
 +     */
 +    protected Object replaceLock = new Object();
 +    protected SSTableReader replacedBy;
 +    private SSTableReader replaces;
-     private SSTableReader sharesBfWith;
 +    private SSTableDeletingTask deletingTask;
 +    private Runnable runOnClose;
 +
 +    @VisibleForTesting
 +    public RestorableMeter readMeter;
 +    protected ScheduledFuture readMeterSyncFuture;
 +
 +    /**
 +     * Calculate approximate key count.
 +     * If cardinality estimator is available on all given sstables, then this method use them to estimate
 +     * key count.
 +     * If not, then this uses index summaries.
 +     *
 +     * @param sstables SSTables to calculate key count
 +     * @return estimated key count
 +     */
 +    public static long getApproximateKeyCount(Collection<SSTableReader> sstables)
 +    {
 +        long count = -1;
 +
 +        // check if cardinality estimator is available for all SSTables
 +        boolean cardinalityAvailable = !sstables.isEmpty() && Iterators.all(sstables.iterator(), new Predicate<SSTableReader>()
 +        {
 +            public boolean apply(SSTableReader sstable)
 +            {
 +                return sstable.descriptor.version.hasNewStatsFile();
 +            }
 +        });
 +
 +        // if it is, load them to estimate key count
 +        if (cardinalityAvailable)
 +        {
 +            boolean failed = false;
 +            ICardinality cardinality = null;
 +            for (SSTableReader sstable : sstables)
 +            {
 +                try
 +                {
 +                    CompactionMetadata metadata = (CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION);
 +                    assert metadata != null : sstable.getFilename();
 +                    if (cardinality == null)
 +                        cardinality = metadata.cardinalityEstimator;
 +                    else
 +                        cardinality = cardinality.merge(metadata.cardinalityEstimator);
 +                }
 +                catch (IOException e)
 +                {
 +                    logger.warn("Reading cardinality from Statistics.db failed.", e);
 +                    failed = true;
 +                    break;
 +                }
 +                catch (CardinalityMergeException e)
 +                {
 +                    logger.warn("Cardinality merge failed.", e);
 +                    failed = true;
 +                    break;
 +                }
 +            }
 +            if (cardinality != null && !failed)
 +                count = cardinality.cardinality();
 +        }
 +
 +        // if something went wrong above or cardinality is not available, calculate using index summary
 +        if (count < 0)
 +        {
 +            for (SSTableReader sstable : sstables)
 +                count += sstable.estimatedKeys();
 +        }
 +        return count;
 +    }
 +
 +    public static SSTableReader open(Descriptor descriptor) throws IOException
 +    {
 +        CFMetaData metadata;
 +        if (descriptor.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR))
 +        {
 +            int i = descriptor.cfname.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
 +            String parentName = descriptor.cfname.substring(0, i);
 +            CFMetaData parent = Schema.instance.getCFMetaData(descriptor.ksname, parentName);
 +            ColumnDefinition def = parent.getColumnDefinitionForIndex(descriptor.cfname.substring(i + 1));
 +            metadata = CFMetaData.newIndexMetadata(parent, def, SecondaryIndex.getIndexComparator(parent, def));
 +        }
 +        else
 +        {
 +            metadata = Schema.instance.getCFMetaData(descriptor.ksname, descriptor.cfname);
 +        }
 +        return open(descriptor, metadata);
 +    }
 +
 +    public static SSTableReader open(Descriptor desc, CFMetaData metadata) throws IOException
 +    {
 +        IPartitioner p = desc.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR)
 +                ? new LocalPartitioner(metadata.getKeyValidator())
 +                : StorageService.getPartitioner();
 +        return open(desc, componentsFor(desc), metadata, p);
 +    }
 +
 +    public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
 +    {
 +        return open(descriptor, components, metadata, partitioner, true);
 +    }
 +
 +    public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException
 +    {
 +        return open(descriptor, components, metadata, StorageService.getPartitioner(), false);
 +    }
 +
 +    /**
 +     * Open SSTable reader to be used in batch mode(such as sstableloader).
 +     *
 +     * @param descriptor
 +     * @param components
 +     * @param metadata
 +     * @param partitioner
 +     * @return opened SSTableReader
 +     * @throws IOException
 +     */
 +    public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
 +    {
 +        // Minimum components without which we can't do anything
 +        assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
 +        assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
 +
 +        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
 +                EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
 +        ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
 +        StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
 +
 +        // Check if sstable is created using same partitioner.
 +        // Partitioner can be null, which indicates older version of sstable or no stats available.
 +        // In that case, we skip the check.
 +        String partitionerName = partitioner.getClass().getCanonicalName();
 +        if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
 +        {
 +            logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
 +                    descriptor, validationMetadata.partitioner, partitionerName));
 +            System.exit(1);
 +        }
 +
 +        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
 +        SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(),
 +                statsMetadata, OpenReason.NORMAL);
 +
 +        // special implementation of load to use non-pooled SegmentedFile builders
 +        SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
 +        SegmentedFile.Builder dbuilder = sstable.compression
 +                ? new CompressedSegmentedFile.Builder(null)
 +                : new BufferedSegmentedFile.Builder();
 +        if (!sstable.loadSummary(ibuilder, dbuilder))
 +            sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL);
 +        sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
 +        sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA));
 +        sstable.bf = FilterFactory.AlwaysPresent;
 +
 +        return sstable;
 +    }
 +
 +    private static SSTableReader open(Descriptor descriptor,
 +                                      Set<Component> components,
 +                                      CFMetaData metadata,
 +                                      IPartitioner partitioner,
 +                                      boolean validate) throws IOException
 +    {
 +        // Minimum components without which we can't do anything
 +        assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
 +        assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
 +
 +        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
 +                EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
 +        ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
 +        StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
 +
 +        // Check if sstable is created using same partitioner.
 +        // Partitioner can be null, which indicates older version of sstable or no stats available.
 +        // In that case, we skip the check.
 +        String partitionerName = partitioner.getClass().getCanonicalName();
 +        if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
 +        {
 +            logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
 +                    descriptor, validationMetadata.partitioner, partitionerName));
 +            System.exit(1);
 +        }
 +
 +        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
 +        SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(),
 +                statsMetadata, OpenReason.NORMAL);
 +
 +        // load index and filter
 +        long start = System.nanoTime();
 +        sstable.load(validationMetadata);
 +        logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 +
 +        if (validate)
 +            sstable.validate();
 +
 +        if (sstable.getKeyCache() != null)
 +            logger.debug("key cache contains {}/{} keys", sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity());
 +
 +        return sstable;
 +    }
 +
 +    public static void logOpenException(Descriptor descriptor, IOException e)
 +    {
 +        if (e instanceof FileNotFoundException)
 +            logger.error("Missing sstable component in {}; skipped because of {}", descriptor, e.getMessage());
 +        else
 +            logger.error("Corrupt sstable {}; skipped", descriptor, e);
 +    }
 +
 +    public static Collection<SSTableReader> openAll(Set<Map.Entry<Descriptor, Set<Component>>> entries,
 +                                                    final CFMetaData metadata,
 +                                                    final IPartitioner partitioner)
 +    {
 +        final Collection<SSTableReader> sstables = new LinkedBlockingQueue<>();
 +
 +        ExecutorService executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", FBUtilities.getAvailableProcessors());
 +        for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
 +        {
 +            Runnable runnable = new Runnable()
 +            {
 +                public void run()
 +                {
 +                    SSTableReader sstable;
 +                    try
 +                    {
 +                        sstable = open(entry.getKey(), entry.getValue(), metadata, partitioner);
 +                    }
 +                    catch (IOException ex)
 +                    {
 +                        logger.error("Corrupt sstable {}; skipped", entry, ex);
 +                        return;
 +                    }
 +                    sstables.add(sstable);
 +                }
 +            };
 +            executor.submit(runnable);
 +        }
 +
 +        executor.shutdown();
 +        try
 +        {
 +            executor.awaitTermination(7, TimeUnit.DAYS);
 +        }
 +        catch (InterruptedException e)
 +        {
 +            throw new AssertionError(e);
 +        }
 +
 +        return sstables;
 +
 +    }
 +
 +    /**
 +     * Open a RowIndexedReader which already has its state initialized (by SSTableWriter).
 +     */
 +    public static SSTableReader internalOpen(Descriptor desc,
 +                                      Set<Component> components,
 +                                      CFMetaData metadata,
 +                                      IPartitioner partitioner,
 +                                      SegmentedFile ifile,
 +                                      SegmentedFile dfile,
 +                                      IndexSummary isummary,
 +                                      IFilter bf,
 +                                      long maxDataAge,
 +                                      StatsMetadata sstableMetadata,
 +                                      OpenReason openReason)
 +    {
 +        assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
 +
 +        SSTableReader reader = internalOpen(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
 +
 +        reader.bf = bf;
 +        reader.ifile = ifile;
 +        reader.dfile = dfile;
 +        reader.indexSummary = isummary;
 +
 +        return reader;
 +    }
 +
 +
 +    private static SSTableReader internalOpen(final Descriptor descriptor,
 +                                            Set<Component> components,
 +                                            CFMetaData metadata,
 +                                            IPartitioner partitioner,
 +                                            Long maxDataAge,
 +                                            StatsMetadata sstableMetadata,
 +                                            OpenReason openReason)
 +    {
 +        Factory readerFactory = descriptor.getFormat().getReaderFactory();
 +
 +        return readerFactory.open(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
 +    }
 +
 +    protected SSTableReader(final Descriptor desc,
 +                            Set<Component> components,
 +                            CFMetaData metadata,
 +                            IPartitioner partitioner,
 +                            long maxDataAge,
 +                            StatsMetadata sstableMetadata,
 +                            OpenReason openReason)
 +    {
 +        super(desc, components, metadata, partitioner);
 +        this.sstableMetadata = sstableMetadata;
 +        this.maxDataAge = maxDataAge;
 +        this.openReason = openReason;
 +
 +        this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata);
 +
 +        deletingTask = new SSTableDeletingTask(this);
 +
 +        // Don't track read rates for tables in the system keyspace.  Also don't track reads for special operations (like early open)
 +        // this is to avoid overflowing the executor queue (see CASSANDRA-8066)
 +        if (SystemKeyspace.NAME.equals(desc.ksname) || openReason != OpenReason.NORMAL)
 +        {
 +            readMeter = null;
 +            readMeterSyncFuture = null;
 +            return;
 +        }
 +
 +        readMeter = SystemKeyspace.getSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
 +        // sync the average read rate to system.sstable_activity every five minutes, starting one minute from now
 +        readMeterSyncFuture = syncExecutor.scheduleAtFixedRate(new Runnable()
 +        {
 +            public void run()
 +            {
 +                if (!isCompacted.get())
 +                {
 +                    meterSyncThrottle.acquire();
 +                    SystemKeyspace.persistSSTableReadMeter(desc.ksname, desc.cfname, desc.generation, readMeter);
 +                }
 +            }
 +        }, 1, 5, TimeUnit.MINUTES);
 +    }
 +
 +    public static long getTotalBytes(Iterable<SSTableReader> sstables)
 +    {
 +        long sum = 0;
 +        for (SSTableReader sstable : sstables)
 +        {
 +            sum += sstable.onDiskLength();
 +        }
 +        return sum;
 +    }
 +
 +    private void tidy(boolean release)
 +    {
 +        if (readMeterSyncFuture != null)
 +            readMeterSyncFuture.cancel(false);
 +
 +        if (references.get() != 0)
 +        {
 +            throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)");
 +        }
 +
 +        synchronized (replaceLock)
 +        {
-             boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = false;
++            boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = isCompacted.get();
 +
 +            if (replacedBy != null)
 +            {
 +                closeBf = replacedBy.bf != bf;
 +                closeSummary = replacedBy.indexSummary != indexSummary;
 +                closeFiles = replacedBy.dfile != dfile;
 +                // if the replacement sstablereader uses a different path, clean up our paths
 +                deleteFiles = !dfile.path.equals(replacedBy.dfile.path);
 +            }
 +
 +            if (replaces != null)
 +            {
 +                closeBf &= replaces.bf != bf;
 +                closeSummary &= replaces.indexSummary != indexSummary;
 +                closeFiles &= replaces.dfile != dfile;
 +                deleteFiles &= !dfile.path.equals(replaces.dfile.path);
 +            }
 +
-             if (sharesBfWith != null)
-             {
-                 closeBf &= sharesBfWith.bf != bf;
-                 closeSummary &= sharesBfWith.indexSummary != indexSummary;
-                 closeFiles &= sharesBfWith.dfile != dfile;
-                 deleteFiles &= !dfile.path.equals(sharesBfWith.dfile.path);
-             }
- 
 +            boolean deleteAll = false;
 +            if (release && isCompacted.get())
 +            {
 +                assert replacedBy == null;
-                 if (replaces != null)
++                if (replaces != null && !deleteFiles)
 +                {
 +                    replaces.replacedBy = null;
 +                    replaces.deletingTask = deletingTask;
 +                    replaces.markObsolete();
 +                }
 +                else
 +                {
 +                    deleteAll = true;
 +                }
 +            }
 +            else
 +            {
 +                if (replaces != null)
 +                    replaces.replacedBy = replacedBy;
 +                if (replacedBy != null)
 +                    replacedBy.replaces = replaces;
 +            }
 +
 +            scheduleTidy(closeBf, closeSummary, closeFiles, deleteFiles, deleteAll);
 +        }
 +    }
 +
 +    private void scheduleTidy(final boolean closeBf, final boolean closeSummary, final boolean closeFiles, final boolean deleteFiles, final boolean deleteAll)
 +    {
 +        if (references.get() != 0)
 +            throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)");
 +
 +        final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
 +        final OpOrder.Barrier barrier;
 +        if (cfs != null)
 +        {
 +            barrier = cfs.readOrdering.newBarrier();
 +            barrier.issue();
 +        }
 +        else
 +            barrier = null;
 +
 +        ScheduledExecutors.nonPeriodicTasks.execute(new Runnable()
 +        {
 +            public void run()
 +            {
 +                if (barrier != null)
 +                    barrier.await();
 +                if (closeBf)
 +                    bf.close();
 +                if (closeSummary)
 +                    indexSummary.close();
 +                if (closeFiles)
 +                {
 +                    ifile.cleanup();
 +                    dfile.cleanup();
 +                }
 +                if (runOnClose != null)
 +                    runOnClose.run();
 +                if (deleteAll)
 +                {
 +                    /**
 +                     * Do the OS a favour and suggest (using fadvice call) that we
 +                     * don't want to see pages of this SSTable in memory anymore.
 +                     *
 +                     * NOTE: We can't use madvice in java because it requires the address of
 +                     * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it
 +                     */
 +                    dropPageCache();
 +                    deletingTask.run();
 +                }
 +                else if (deleteFiles)
 +                {
 +                    FileUtils.deleteWithConfirm(new File(dfile.path));
 +                    FileUtils.deleteWithConfirm(new File(ifile.path));
 +                }
 +            }
 +        });
 +    }
 +
 +    public boolean equals(Object that)
 +    {
 +        return that instanceof SSTableReader && ((SSTableReader) that).descriptor.equals(this.descriptor);
 +    }
 +
 +    public int hashCode()
 +    {
 +        return this.descriptor.hashCode();
 +    }
 +
 +    public String getFilename()
 +    {
 +        return dfile.path;
 +    }
 +
 +    public String getIndexFilename()
 +    {
 +        return ifile.path;
 +    }
 +
 +    public void setTrackedBy(DataTracker tracker)
 +    {
 +        deletingTask.setTracker(tracker);
 +        // under normal operation we can do this at any time, but SSTR is also used outside C* proper,
 +        // e.g. by BulkLoader, which does not initialize the cache.  As a kludge, we set up the cache
 +        // here when we know we're being wired into the rest of the server infrastructure.
 +        keyCache = CacheService.instance.keyCache;
 +    }
 +
 +    private void load(ValidationMetadata validation) throws IOException
 +    {
 +        if (metadata.getBloomFilterFpChance() == 1.0)
 +        {
 +            // bf is disabled.
 +            load(false, true);
 +            bf = FilterFactory.AlwaysPresent;
 +        }
 +        else if (!components.contains(Component.FILTER) || validation == null)
 +        {
 +            // bf is enabled, but filter component is missing.
 +            load(true, true);
 +        }
 +        else if (validation.bloomFilterFPChance != metadata.getBloomFilterFpChance())
 +        {
 +            // bf fp chance in sstable metadata and it has changed since compaction.
 +            load(true, true);
 +        }
 +        else
 +        {
 +            // bf is enabled and fp chance matches the currently configured value.
 +            load(false, true);
 +            loadBloomFilter();
 +        }
 +    }
 +
 +    /**
 +     * Load bloom filter from Filter.db file.
 +     *
 +     * @throws IOException
 +     */
 +    private void loadBloomFilter() throws IOException
 +    {
 +        DataInputStream stream = null;
 +        try
 +        {
 +            stream = new DataInputStream(new BufferedInputStream(new FileInputStream(descriptor.filenameFor(Component.FILTER))));
 +            bf = FilterFactory.deserialize(stream, true);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(stream);
 +        }
 +    }
 +
 +    /**
 +     * Loads ifile, dfile and indexSummary, and optionally recreates the bloom filter.
 +     * @param saveSummaryIfCreated for bulk loading purposes, if the summary was absent and needed to be built, you can
 +     *                             avoid persisting it to disk by setting this to false
 +     */
 +    private void load(boolean recreateBloomFilter, boolean saveSummaryIfCreated) throws IOException
 +    {
 +        SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
 +        SegmentedFile.Builder dbuilder = compression
 +                ? SegmentedFile.getCompressedBuilder()
 +                : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 +
 +        boolean summaryLoaded = loadSummary(ibuilder, dbuilder);
 +        if (recreateBloomFilter || !summaryLoaded)
 +            buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL);
 +
 +        ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
 +        dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
 +        if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) // save summary information to disk
 +            saveSummary(ibuilder, dbuilder);
 +    }
 +
 +    /**
 +     * Build index summary(and optionally bloom filter) by reading through Index.db file.
 +     *
 +     * @param recreateBloomFilter true if recreate bloom filter
 +     * @param ibuilder
 +     * @param dbuilder
 +     * @param summaryLoaded true if index summary is already loaded and not need to build again
 +     * @throws IOException
 +     */
 +    private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded, int samplingLevel) throws IOException
 +    {
 +        // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
 +        RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 +
 +        try
 +        {
 +            long indexSize = primaryIndex.length();
 +            long histogramCount = sstableMetadata.estimatedRowSize.count();
 +            long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed()
 +                    ? histogramCount
 +                    : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional
 +
 +            if (recreateBloomFilter)
 +                bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(), true);
 +
 +            IndexSummaryBuilder summaryBuilder = null;
 +            if (!summaryLoaded)
 +                summaryBuilder = new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel);
 +
 +            long indexPosition;
 +            RowIndexEntry.IndexSerializer rowIndexSerializer = descriptor.getFormat().getIndexSerializer(metadata);
 +
 +            while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
 +            {
 +                ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
 +                RowIndexEntry indexEntry = rowIndexSerializer.deserialize(primaryIndex, descriptor.version);
 +                DecoratedKey decoratedKey = partitioner.decorateKey(key);
 +                if (first == null)
 +                    first = decoratedKey;
 +                last = decoratedKey;
 +
 +                if (recreateBloomFilter)
 +                    bf.add(decoratedKey.getKey());
 +
 +                // if summary was already read from disk we don't want to re-populate it using primary index
 +                if (!summaryLoaded)
 +                {
 +                    summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
 +                    ibuilder.addPotentialBoundary(indexPosition);
 +                    dbuilder.addPotentialBoundary(indexEntry.position);
 +                }
 +            }
 +
 +            if (!summaryLoaded)
 +                indexSummary = summaryBuilder.build(partitioner);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(primaryIndex);
 +        }
 +
 +        first = getMinimalKey(first);
 +        last = getMinimalKey(last);
 +    }
 +
 +    /**
 +     * Load index summary from Summary.db file if it exists.
 +     *
 +     * if loaded index summary has different index interval from current value stored in schema,
 +     * then Summary.db file will be deleted and this returns false to rebuild summary.
 +     *
 +     * @param ibuilder
 +     * @param dbuilder
 +     * @return true if index summary is loaded successfully from Summary.db file.
 +     */
 +    public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
 +    {
 +        File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
 +        if (!summariesFile.exists())
 +            return false;
 +
 +        DataInputStream iStream = null;
 +        try
 +        {
 +            iStream = new DataInputStream(new FileInputStream(summariesFile));
 +            indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner, descriptor.version.hasSamplingLevel(), metadata.getMinIndexInterval(), metadata.getMaxIndexInterval());
 +            first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
 +            last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
 +            ibuilder.deserializeBounds(iStream);
 +            dbuilder.deserializeBounds(iStream);
 +        }
 +        catch (IOException e)
 +        {
 +            logger.debug("Cannot deserialize SSTable Summary File {}: {}", summariesFile.getPath(), e.getMessage());
 +            // corrupted; delete it and fall back to creating a new summary
 +            FileUtils.closeQuietly(iStream);
 +            // delete it and fall back to creating a new summary
 +            FileUtils.deleteWithConfirm(summariesFile);
 +            return false;
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(iStream);
 +        }
 +
 +        return true;
 +    }
 +
 +    /**
 +     * Save index summary to Summary.db file.
 +     *
 +     * @param ibuilder
 +     * @param dbuilder
 +     */
 +    public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
 +    {
 +        saveSummary(ibuilder, dbuilder, indexSummary);
 +    }
 +
 +    private void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary summary)
 +    {
 +        File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
 +        if (summariesFile.exists())
 +            FileUtils.deleteWithConfirm(summariesFile);
 +
 +        DataOutputStreamAndChannel oStream = null;
 +        try
 +        {
 +            oStream = new DataOutputStreamAndChannel(new FileOutputStream(summariesFile));
 +            IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel());
 +            ByteBufferUtil.writeWithLength(first.getKey(), oStream);
 +            ByteBufferUtil.writeWithLength(last.getKey(), oStream);
 +            ibuilder.serializeBounds(oStream);
 +            dbuilder.serializeBounds(oStream);
 +        }
 +        catch (IOException e)
 +        {
 +            logger.debug("Cannot save SSTable Summary: ", e);
 +
 +            // corrupted hence delete it and let it load it now.
 +            if (summariesFile.exists())
 +                FileUtils.deleteWithConfirm(summariesFile);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(oStream);
 +        }
 +    }
 +
 +    public void setReplacedBy(SSTableReader replacement)
 +    {
 +        synchronized (replaceLock)
 +        {
 +            assert replacedBy == null;
 +            replacedBy = replacement;
 +            replacement.replaces = this;
 +            replacement.replaceLock = replaceLock;
 +        }
 +    }
 +
-     /**
-      * this is used to avoid closing the bloom filter multiple times when finishing an SSTableRewriter
-      *
-      * note that the reason we don't use replacedBy is that we are not yet actually replaced
-      *
-      * @param newReader
-      */
-     public void sharesBfWith(SSTableReader newReader)
-     {
-         assert openReason.equals(OpenReason.EARLY);
-         this.sharesBfWith = newReader;
-     }
- 
 +    public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
 +    {
 +        synchronized (replaceLock)
 +        {
 +            assert replacedBy == null;
 +
 +            if (newStart.compareTo(this.first) > 0)
 +            {
 +                if (newStart.compareTo(this.last) > 0)
 +                {
 +                    this.runOnClose = new Runnable()
 +                    {
 +                        public void run()
 +                        {
 +                            CLibrary.trySkipCache(dfile.path, 0, 0);
 +                            CLibrary.trySkipCache(ifile.path, 0, 0);
 +                            runOnClose.run();
 +                        }
 +                    };
 +                }
 +                else
 +                {
 +                    final long dataStart = getPosition(newStart, Operator.GE).position;
 +                    final long indexStart = getIndexScanPosition(newStart);
 +                    this.runOnClose = new Runnable()
 +                    {
 +                        public void run()
 +                        {
 +                            CLibrary.trySkipCache(dfile.path, 0, dataStart);
 +                            CLibrary.trySkipCache(ifile.path, 0, indexStart);
 +                            runOnClose.run();
 +                        }
 +                    };
 +                }
 +            }
 +
 +            SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile, dfile, indexSummary.readOnlyClone(), bf, maxDataAge, sstableMetadata,
 +                    openReason == OpenReason.EARLY ? openReason : OpenReason.METADATA_CHANGE);
 +            replacement.readMeterSyncFuture = this.readMeterSyncFuture;
 +            replacement.readMeter = this.readMeter;
 +            replacement.first = this.last.compareTo(newStart) > 0 ? newStart : this.last;
 +            replacement.last = this.last;
 +            setReplacedBy(replacement);
 +            return replacement;
 +        }
 +    }
 +
 +    /**
 +     * Returns a new SSTableReader with the same properties as this SSTableReader except that a new IndexSummary will
 +     * be built at the target samplingLevel.  This (original) SSTableReader instance will be marked as replaced, have
 +     * its DeletingTask removed, and have its periodic read-meter sync task cancelled.
 +     * @param samplingLevel the desired sampling level for the index summary on the new SSTableReader
 +     * @return a new SSTableReader
 +     * @throws IOException
 +     */
 +    public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException
 +    {
 +        synchronized (replaceLock)
 +        {
 +            assert replacedBy == null;
 +
 +            int minIndexInterval = metadata.getMinIndexInterval();
 +            int maxIndexInterval = metadata.getMaxIndexInterval();
 +            double effectiveInterval = indexSummary.getEffectiveIndexInterval();
 +
 +            IndexSummary newSummary;
 +            long oldSize = bytesOnDisk();
 +
 +            // We have to rebuild the summary from the on-disk primary index in three cases:
 +            // 1. The sampling level went up, so we need to read more entries off disk
 +            // 2. The min_index_interval changed (in either direction); this changes what entries would be in the summary
 +            //    at full sampling (and consequently at any other sampling level)
 +            // 3. The max_index_interval was lowered, forcing us to raise the sampling level
 +            if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval() != minIndexInterval || effectiveInterval > maxIndexInterval)
 +            {
 +                newSummary = buildSummaryAtLevel(samplingLevel);
 +            }
 +            else if (samplingLevel < indexSummary.getSamplingLevel())
 +            {
 +                // we can use the existing index summary to make a smaller one
 +                newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner);
 +
 +                SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
 +                SegmentedFile.Builder dbuilder = compression
 +                        ? SegmentedFile.getCompressedBuilder()
 +                        : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 +                saveSummary(ibuilder, dbuilder, newSummary);
 +            }
 +            else
 +            {
 +                throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level and " +
 +                        "no adjustments to min/max_index_interval");
 +            }
 +
 +            long newSize = bytesOnDisk();
 +            StorageMetrics.load.inc(newSize - oldSize);
 +            parent.metric.liveDiskSpaceUsed.inc(newSize - oldSize);
 +
 +            SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile, dfile, newSummary, bf, maxDataAge, sstableMetadata,
 +                    openReason == OpenReason.EARLY ? openReason : OpenReason.METADATA_CHANGE);
 +            replacement.readMeterSyncFuture = this.readMeterSyncFuture;
 +            replacement.readMeter = this.readMeter;
 +            replacement.first = this.first;
 +            replacement.last = this.last;
 +            setReplacedBy(replacement);
 +            return replacement;
 +        }
 +    }
 +
 +    private IndexSummary buildSummaryAtLevel(int newSamplingLevel) throws IOException
 +    {
 +        // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
 +        RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 +        try
 +        {
 +            long indexSize = primaryIndex.length();
 +            IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(), metadata.getMinIndexInterval(), newSamplingLevel);
 +
 +            long indexPosition;
 +            while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
 +            {
 +                summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
 +                RowIndexEntry.Serializer.skip(primaryIndex);
 +            }
 +
 +            return summaryBuilder.build(partitioner);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(primaryIndex);
 +        }
 +    }
 +
 +    public int getIndexSummarySamplingLevel()
 +    {
 +        return indexSummary.getSamplingLevel();
 +    }
 +
 +    public long getIndexSummaryOffHeapSize()
 +    {
 +        return indexSummary.getOffHeapSize();
 +    }
 +
 +    public int getMinIndexInterval()
 +    {
 +        return indexSummary.getMinIndexInterval();
 +    }
 +
 +    public double getEffectiveIndexInterval()
 +    {
 +        return indexSummary.getEffectiveIndexInterval();
 +    }
 +
 +    public void releaseSummary() throws IOException
 +    {
 +        indexSummary.close();
 +        indexSummary = null;
 +    }
 +
 +    private void validate()
 +    {
 +        if (this.first.compareTo(this.last) > 0)
 +            throw new IllegalStateException(String.format("SSTable first key %s > last key %s", this.first, this.last));
 +    }
 +
 +    /**
 +     * Gets the position in the index file to start scanning to find the given key (at most indexInterval keys away,
 +     * modulo downsampling of the index summary).
 +     */
 +    public long getIndexScanPosition(RowPosition key)
 +    {
 +        return getIndexScanPositionFromBinarySearchResult(indexSummary.binarySearch(key), indexSummary);
 +    }
 +
 +    protected static long getIndexScanPositionFromBinarySearchResult(int binarySearchResult, IndexSummary referencedIndexSummary)
 +    {
 +        if (binarySearchResult == -1)
 +            return -1;
 +        else
 +            return referencedIndexSummary.getPosition(getIndexSummaryIndexFromBinarySearchResult(binarySearchResult));
 +    }
 +
 +    protected static int getIndexSummaryIndexFromBinarySearchResult(int binarySearchResult)
 +    {
 +        if (binarySearchResult < 0)
 +        {
 +            // binary search gives us the first index _greater_ than the key searched for,
 +            // i.e., its insertion position
 +            int greaterThan = (binarySearchResult + 1) * -1;
 +            if (greaterThan == 0)
 +                return -1;
 +            return greaterThan - 1;
 +        }
 +        else
 +        {
 +            return binarySearchResult;
 +        }
 +    }
 +
 +    /**
 +     * Returns the compression metadata for this sstable.
 +     * @throws IllegalStateException if the sstable is not compressed
 +     */
 +    public CompressionMetadata getCompressionMetadata()
 +    {
 +        if (!compression)
 +            throw new IllegalStateException(this + " is not compressed");
 +
 +        CompressionMetadata cmd = ((ICompressedFile) dfile).getMetadata();
 +
 +        //We need the parent cf metadata
 +        String cfName = metadata.isSecondaryIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName;
 +        cmd.parameters.setLiveMetadata(Schema.instance.getCFMetaData(metadata.ksName, cfName));
 +
 +        return cmd;
 +    }
 +
 +    /**
 +     * Returns the amount of memory in bytes used off heap by the compression meta-data.
 +     * @return the amount of memory in bytes used off heap by the compression meta-data
 +     */
 +    public long getCompressionMetadataOffHeapSize()
 +    {
 +        if (!compression)
 +            return 0;
 +
 +        return getCompressionMetadata().offHeapSize();
 +    }
 +
 +    /**
 +     * For testing purposes only.
 +     */
 +    public void forceFilterFailures()
 +    {
 +        bf = FilterFactory.AlwaysPresent;
 +    }
 +
 +    public IFilter getBloomFilter()
 +    {
 +        return bf;
 +    }
 +
 +    public long getBloomFilterSerializedSize()
 +    {
 +        return bf.serializedSize();
 +    }
 +
 +    /**
 +     * Returns the amount of memory in bytes used off heap by the bloom filter.
 +     * @return the amount of memory in bytes used off heap by the bloom filter
 +     */
 +    public long getBloomFilterOffHeapSize()
 +    {
 +        return bf.offHeapSize();
 +    }
 +
 +    /**
 +     * @return An estimate of the number of keys in this SSTable based on the index summary.
 +     */
 +    public long estimatedKeys()
 +    {
 +        return indexSummary.getEstimatedKeyCount();
 +    }
 +
 +    /**
 +     * @param ranges
 +     * @return An estimate of the number of keys for given ranges in this SSTable.
 +     */
 +    public long estimatedKeysForRanges(Collection<Range<Token>> ranges)
 +    {
 +        long sampleKeyCount = 0;
 +        List<Pair<Integer, Integer>> sampleIndexes = getSampleIndexesForRanges(indexSummary, ranges);
 +        for (Pair<Integer, Integer> sampleIndexRange : sampleIndexes)
 +            sampleKeyCount += (sampleIndexRange.right - sampleIndexRange.left + 1);
 +
 +        // adjust for the current sampling level: (BSL / SL) * index_interval_at_full_sampling
 +        long estimatedKeys = sampleKeyCount * (Downsampling.BASE_SAMPLING_LEVEL * indexSummary.getMinIndexInterval()) / indexSummary.getSamplingLevel();
 +        return Math.max(1, estimatedKeys);
 +    }
 +
 +    /**
 +     * Returns the number of entries in the IndexSummary.  At full sampling, this is approximately 1/INDEX_INTERVALth of
 +     * the keys in this SSTable.
 +     */
 +    public int getIndexSummarySize()
 +    {
 +        return indexSummary.size();
 +    }
 +
 +    /**
 +     * Returns the approximate number of entries the IndexSummary would contain if it were at full sampling.
 +     */
 +    public int getMaxIndexSummarySize()
 +    {
 +        return indexSummary.getMaxNumberOfEntries();
 +    }
 +
 +    /**
 +     * Returns the key for the index summary entry at `index`.
 +     */
 +    public byte[] getIndexSummaryKey(int index)
 +    {
 +        return indexSummary.getKey(index);
 +    }
 +
 +    private static List<Pair<Integer,Integer>> getSampleIndexesForRanges(IndexSummary summary, Collection<Range<Token>> ranges)
 +    {
 +        // use the index to determine a minimal section for each range
 +        List<Pair<Integer,Integer>> positions = new ArrayList<>();
 +
 +        for (Range<Token> range : Range.normalize(ranges))
 +        {
 +            RowPosition leftPosition = range.left.maxKeyBound();
 +            RowPosition rightPosition = range.right.maxKeyBound();
 +
 +            int left = summary.binarySearch(leftPosition);
 +            if (left < 0)
 +                left = (left + 1) * -1;
 +            else
 +                // left range are start exclusive
 +                left = left + 1;
 +            if (left == summary.size())
 +                // left is past the end of the sampling
 +                continue;
 +
 +            int right = Range.isWrapAround(range.left, range.right)
 +                    ? summary.size() - 1
 +                    : summary.binarySearch(rightPosition);
 +            if (right < 0)
 +            {
 +                // range are end inclusive so we use the previous index from what binarySearch give us
 +                // since that will be the last index we will return
 +                right = (right + 1) * -1;
 +                if (right == 0)
 +                    // Means the first key is already stricly greater that the right bound
 +                    continue;
 +                right--;
 +            }
 +
 +            if (left > right)
 +                // empty range
 +                continue;
 +            positions.add(Pair.create(left, right));
 +        }
 +        return positions;
 +    }
 +
 +    public Iterable<DecoratedKey> getKeySamples(final Range<Token> range)
 +    {
 +        final List<Pair<Integer, Integer>> indexRanges = getSampleIndexesForRanges(indexSummary, Collections.singletonList(range));
 +
 +        if (indexRanges.isEmpty())
 +            return Collections.emptyList();
 +
 +        return new Iterable<DecoratedKey>()
 +        {
 +            public Iterator<DecoratedKey> iterator()
 +            {
 +                return new Iterator<DecoratedKey>()
 +                {
 +                    private Iterator<Pair<Integer, Integer>> rangeIter = indexRanges.iterator();
 +                    private Pair<Integer, Integer> current;
 +                    private int idx;
 +
 +                    public boolean hasNext()
 +                    {
 +                        if (current == null || idx > current.right)
 +                        {
 +                            if (rangeIter.hasNext())
 +                            {
 +                                current = rangeIter.next();
 +                                idx = current.left;
 +                                return true;
 +                            }
 +                            return false;
 +                        }
 +
 +                        return true;
 +                    }
 +
 +                    public DecoratedKey next()
 +                    {
 +                        byte[] bytes = indexSummary.getKey(idx++);
 +                        return partitioner.decorateKey(ByteBuffer.wrap(bytes));
 +                    }
 +
 +                    public void remove()
 +                    {
 +                        throw new UnsupportedOperationException();
 +                    }
 +                };
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Determine the minimal set of sections that can be extracted from this SSTable to cover the given ranges.
 +     * @return A sorted list of (offset,end) pairs that cover the given ranges in the datafile for this SSTable.
 +     */
 +    public List<Pair<Long,Long>> getPositionsForRanges(Collection<Range<Token>> ranges)
 +    {
 +        // use the index to determine a minimal section for each range
 +        List<Pair<Long,Long>> positions = new ArrayList<>();
 +        for (Range<Token> range : Range.normalize(ranges))
 +        {
 +            assert !range.isWrapAround() || range.right.isMinimum();
 +            // truncate the range so it at most covers the sstable
 +            AbstractBounds<RowPosition> bounds = range.toRowBounds();
 +            RowPosition leftBound = bounds.left.compareTo(first) > 0 ? bounds.left : first.getToken().minKeyBound();
 +            RowPosition rightBound = bounds.right.isMinimum() ? last.getToken().maxKeyBound() : bounds.right;
 +
 +            if (leftBound.compareTo(last) > 0 || rightBound.compareTo(first) < 0)
 +                continue;
 +
 +            long left = getPosition(leftBound, Operator.GT).position;
 +            long right = (rightBound.compareTo(last) > 0)
 +                         ? (openReason == OpenReason.EARLY
 +                            // if opened early, we overlap with the old sstables by one key, so we know that the last
 +                            // (and further) key(s) will be streamed from these if necessary
 +                            ? getPosition(last.getToken().maxKeyBound(), Operator.GT).position
 +                            : uncompressedLength())
 +                         : getPosition(rightBound, Operator.GT).position;
 +
 +            if (left == right)
 +                // empty range
 +                continue;
 +
 +            assert left < right : String.format("Range=%s openReason=%s first=%s last=%s left=%d right=%d", range, openReason, first, last, left, right);
 +            positions.add(Pair.create(left, right));
 +        }
 +        return positions;
 +    }
 +
 +    public void invalidateCacheKey(DecoratedKey key)
 +    {
 +        KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
 +        keyCache.remove(cacheKey);
 +    }
 +
 +    public void cacheKey(DecoratedKey key, RowIndexEntry info)
 +    {
 +        CachingOptions caching = metadata.getCaching();
 +
 +        if (!caching.keyCache.isEnabled()
 +                || keyCache == null
 +                || keyCache.getCapacity() == 0)
 +        {
 +            return;
 +        }
 +
 +        KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
 +        logger.trace("Adding cache entry for {} -> {}", cacheKey, info);
 +        keyCache.put(cacheKey, info);
 +    }
 +
 +    public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats)
 +    {
 +        return getCachedPosition(new KeyCacheKey(metadata.cfId, descriptor, key.getKey()), updateStats);
 +    }
 +
 +    protected RowIndexEntry getCachedPosition(KeyCacheKey unifiedKey, boolean updateStats)
 +    {
 +        if (keyCache != null && keyCache.getCapacity() > 0) {
 +            if (updateStats)
 +            {
 +                RowIndexEntry cachedEntry = keyCache.get(unifiedKey);
 +                keyCacheRequest.incrementAndGet();
 +                if (cachedEntry != null)
 +                    keyCacheHit.incrementAndGet();
 +                return cachedEntry;
 +            }
 +            else
 +            {
 +                return keyCache.getInternal(unifiedKey);
 +            }
 +        }
 +        return null;
 +    }
 +
 +    /**
 +     * Get position updating key cache and stats.
 +     * @see #getPosition(org.apache.cassandra.db.RowPosition, SSTableReader.Operator, boolean)
 +     */
 +    public RowIndexEntry getPosition(RowPosition key, Operator op)
 +    {
 +        return getPosition(key, op, true);
 +    }
 +
 +    /**
 +     * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
 +     * allow key selection by token bounds but only if op != * EQ
 +     * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
 +     * @param updateCacheAndStats true if updating stats and cache
 +     * @return The index entry corresponding to the key, or null if the key is not present
 +     */
 +    public abstract RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats);
 +
 +    //Corresponds to a name column
 +    public abstract OnDiskAtomIterator iterator(DecoratedKey key, SortedSet<CellName> columns);
 +    public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry);
 +
 +    //Corresponds to a slice query
 +    public abstract OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, boolean reverse);
 +    public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, ColumnSlice[] slices, boolean reversed, RowIndexEntry indexEntry);
 +
 +    /**
 +     * Finds and returns the first key beyond a given token in this SSTable or null if no such key exists.
 +     */
 +    public DecoratedKey firstKeyBeyond(RowPosition token)
 +    {
 +        long sampledPosition = getIndexScanPosition(token);
 +        if (sampledPosition == -1)
 +            sampledPosition = 0;
 +
 +        Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
 +        while (segments.hasNext())
 +        {
 +            FileDataInput in = segments.next();
 +            try
 +            {
 +                while (!in.isEOF())
 +                {
 +                    ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
 +                    DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey);
 +                    if (indexDecoratedKey.compareTo(token) > 0)
 +                        return indexDecoratedKey;
 +
 +                    RowIndexEntry.Serializer.skip(in);
 +                }
 +            }
 +            catch (IOException e)
 +            {
 +                markSuspect();
 +                throw new CorruptSSTableException(e, in.getPath());
 +            }
 +            finally
 +            {
 +                FileUtils.closeQuietly(in);
 +            }
 +        }
 +
 +        return null;
 +    }
 +
 +    /**
 +     * @return The length in bytes of the data for this SSTable. For
 +     * compressed files, this is not the same thing as the on disk size (see
 +     * onDiskLength())
 +     */
 +    public long uncompressedLength()
 +    {
 +        return dfile.length;
 +    }
 +
 +    /**
 +     * @return The length in bytes of the on disk size for this SSTable. For
 +     * compressed files, this is not the same thing as the data length (see
 +     * length())
 +     */
 +    public long onDiskLength()
 +    {
 +        return dfile.onDiskLength;
 +    }
 +
 +    public boolean acquireReference()
 +    {
 +        while (true)
 +        {
 +            int n = references.get();
 +            if (n <= 0)
 +                return false;
 +            if (references.compareAndSet(n, n + 1))
 +                return true;
 +        }
 +    }
 +
 +    @VisibleForTesting
 +    public int referenceCount()
 +    {
 +        return references.get();
 +    }
 +
 +    /**
 +     * Release reference to this SSTableReader.
 +     * If there is no one referring to this SSTable, and is marked as compacted,
 +     * all resources are cleaned up and files are deleted eventually.
 +     */
 +    public void releaseReference()
 +    {
 +        if (references.decrementAndGet() == 0)
 +            tidy(true);
 +        assert references.get() >= 0 : "Reference counter " +  references.get() + " for " + dfile.path;
 +    }
 +
 +    /**
 +     * Mark the sstable as obsolete, i.e., compacted into newer sstables.
 +     *
 +     * When calling this function, the caller must ensure that the SSTableReader is not referenced anywhere
 +     * except for threads holding a reference.
 +     *
 +     * @return true if the this is the first time the file was marked obsolete.  Calling this
 +     * multiple times is usually buggy (see exceptions in DataTracker.unmarkCompacting and removeOldSSTablesSize).
 +     */
 +    public boolean markObsolete()
 +    {
 +        if (logger.isDebugEnabled())
 +            logger.debug("Marking {} compacted", getFilename());
 +
 +        synchronized (replaceLock)
 +        {
 +            assert replacedBy == null : getFilename();
 +        }
 +        return !isCompacted.getAndSet(true);
 +    }
 +
 +    public boolean isMarkedCompacted()
 +    {
 +        return isCompacted.get();
 +    }
 +
 +    public void markSuspect()
 +    {
 +        if (logger.isDebugEnabled())
 +            logger.debug("Marking {} as a suspect for blacklisting.", getFilename());
 +
 +        isSuspect.getAndSet(true);
 +    }
 +
 +    public boolean isMarkedSuspect()
 +    {
 +        return isSuspect.get();
 +    }
 +
 +
 +    /**
 +     * I/O SSTableScanner
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ISSTableScanner getScanner()
 +    {
 +        return getScanner((RateLimiter) null);
 +    }
 +
 +    public ISSTableScanner getScanner(RateLimiter limiter)
 +    {
 +        return getScanner(DataRange.allData(partitioner), limiter);
 +    }
 +
 +    /**
 +     *
 +     * @param dataRange filter to use when reading the columns
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ISSTableScanner getScanner(DataRange dataRange)
 +    {
 +        return getScanner(dataRange, null);
 +    }
 +
 +    /**
 +     * Direct I/O SSTableScanner over a defined range of tokens.
 +     *
 +     * @param range the range of keys to cover
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ISSTableScanner getScanner(Range<Token> range, RateLimiter limiter)
 +    {
 +        if (range == null)
 +            return getScanner(limiter);
 +        return getScanner(Collections.singletonList(range), limiter);
 +    }
 +
 +    /**
 +     * Direct I/O SSTableScanner over a defined collection of ranges of tokens.
 +     *
 +     * @param ranges the range of keys to cover
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public abstract ISSTableScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter);
 +
 +    /**
 +     *
 +     * @param dataRange filter to use when reading the columns
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public abstract ISSTableScanner getScanner(DataRange dataRange, RateLimiter limiter);
 +
 +
 +
 +    public FileDataInput getFileDataInput(long position)
 +    {
 +        return dfile.getSegment(position);
 +    }
 +
 +    /**
 +     * Tests if the sstable contains data newer than the given age param (in localhost currentMilli time).
 +     * This works in conjunction with maxDataAge which is an upper bound on the create of data in this sstable.
 +     * @param age The age to compare the maxDataAre of this sstable. Measured in millisec since epoc on this host
 +     * @return True iff this sstable contains data that's newer than the given age parameter.
 +     */
 +    public boolean newSince(long age)
 +    {
 +        return maxDataAge > age;
 +    }
 +
 +    public void createLinks(String snapshotDirectoryPath)
 +    {
 +        for (Component component : components)
 +        {
 +            File sourceFile = new File(descriptor.filenameFor(component));
 +            File targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
 +            FileUtils.createHardLink(sourceFile, targetLink);
 +        }
 +    }
 +
 +    public boolean isRepaired()
 +    {
 +        return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
 +    }
 +
 +    public SSTableReader getCurrentReplacement()
 +    {
 +        synchronized (replaceLock)
 +        {
 +            SSTableReader cur = this, next = replacedBy;
 +            while (next != null)
 +            {
 +                cur = next;
 +                next = next.replacedBy;
 +            }
 +            return cur;
 +        }
 +    }
 +
 +    /**
 +     * TODO: Move someplace reusable
 +     */
 +    public abstract static class Operator
 +    {
 +        public static final Operator EQ = new Equals();
 +        public static final Operator GE = new GreaterThanOrEqualTo();
 +        public static final Operator GT = new GreaterThan();
 +
 +        /**
 +         * @param comparison The result of a call to compare/compareTo, with the desired field on the rhs.
 +         * @return less than 0 if the operator cannot match forward, 0 if it matches, greater than 0 if it might match forward.
 +         */
 +        public abstract int apply(int comparison);
 +
 +        final static class Equals extends Operator
 +        {
 +            public int apply(int comparison) { return -comparison; }
 +        }
 +
 +        final static class GreaterThanOrEqualTo extends Operator
 +        {
 +            public int apply(int comparison) { return comparison >= 0 ? 0 : -comparison; }
 +        }
 +
 +        final static class GreaterThan extends Operator
 +        {
 +            public int apply(int comparison) { return comparison > 0 ? 0 : 1; }
 +        }
 +    }
 +
 +    public long getBloomFilterFalsePositiveCount()
 +    {
 +        return bloomFilterTracker.getFalsePositiveCount();
 +    }
 +
 +    public long getRecentBloomFilterFalsePositiveCount()
 +    {
 +        return bloomFilterTracker.getRecentFalsePositiveCount();
 +    }
 +
 +    public long getBloomFilterTruePositiveCount()
 +    {
 +        return bloomFilterTracker.getTruePositiveCount();
 +    }
 +
 +    public long getRecentBloomFilterTruePositiveCount()
 +    {
 +        return bloomFilterTracker.getRecentTruePositiveCount();
 +    }
 +
 +    public InstrumentingCache<KeyCacheKey, RowIndexEntry> getKeyCache()
 +    {
 +        return keyCache;
 +    }
 +
 +    public EstimatedHistogram getEstimatedRowSize()
 +    {
 +        return sstableMetadata.estimatedRowSize;
 +    }
 +
 +    public EstimatedHistogram getEstimatedColumnCount()
 +    {
 +        return sstableMetadata.estimatedColumnCount;
 +    }
 +
 +    public double getEstimatedDroppableTombstoneRatio(int gcBefore)
 +    {
 +        return sstableMetadata.getEstimatedDroppableTombstoneRatio(gcBefore);
 +    }
 +
 +    public double getDroppableTombstonesBefore(int gcBefore)
 +    {
 +        return sstableMetadata.getDroppableTombstonesBefore(gcBefore);
 +    }
 +
 +    public double getCompressionRatio()
 +    {
 +        return sstableMetadata.compressionRatio;
 +    }
 +
 +    public ReplayPosition getReplayPosition()
 +    {
 +        return sstableMetadata.replayPosition;
 +    }
 +
 +    public long getMinTimestamp()
 +    {
 +        return sstableMetadata.minTimestamp;
 +    }
 +
 +    public long getMaxTimestamp()
 +    {
 +        return sstableMetadata.maxTimestamp;
 +    }
 +
 +    public Set<Integer> getAncestors()
 +    {
 +        try
 +        {
 +            CompactionMetadata compactionMetadata = (CompactionMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.COMPACTION);
 +            return compactionMetadata.ancestors;
 +        }
 +        catch (IOException e)
 +        {
 +            SSTableReader.logOpenException(descriptor, e);
 +            return Collections.emptySet();
 +        }
 +    }
 +
 +    public int getSSTableLevel()
 +    {
 +        return sstableMetadata.sstableLevel;
 +    }
 +
 +    /**
 +     * Reloads the sstable metadata from disk.
 +     *
 +     * Called after level is changed on sstable, for example if the sstable is dropped to L0
 +     *
 +     * Might be possible to remove in future versions
 +     *
 +     * @throws IOException
 +     */
 +    public void reloadSSTableMetadata() throws IOException
 +    {
 +        this.sstableMetadata = (StatsMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.STATS);
 +    }
 +
 +    public StatsMetadata getSSTableMetadata()
 +    {
 +        return sstableMetadata;
 +    }
 +
 +    public RandomAccessReader openDataReader(RateLimiter limiter)
 +    {
 +        assert limiter != null;
 +        return compression
 +                ? CompressedThrottledReader.open(getFilename(), getCompressionMetadata(), limiter)
 +                : ThrottledReader.open(new File(getFilename()), limiter);
 +    }
 +
 +    public RandomAccessReader openDataReader()
 +    {
 +        return compression
 +                ? CompressedRandomAccessReader.open(getFilename(), getCompressionMetadata())
 +                : RandomAccessReader.open(new File(getFilename()));
 +    }
 +
 +    public RandomAccessReader openIndexReader()
 +    {
 +        return RandomAccessReader.open(new File(getIndexFilename()));
 +    }
 +
 +    /**
 +     * @param component component to get timestamp.
 +     * @return last modified time for given component. 0 if given component does not exist or IO error occurs.
 +     */
 +    public long getCreationTimeFor(Component component)
 +    {
 +        return new File(descriptor.filenameFor(component)).lastModified();
 +    }
 +
 +    /**
 +     * @return Number of key cache hit
 +     */
 +    public long getKeyCacheHit()
 +    {
 +        return keyCacheHit.get();
 +    }
 +
 +    /**
 +     * @return Number of key cache request
 +     */
 +    public long getKeyCacheRequest()
 +    {
 +        return keyCacheRequest.get();
 +    }
 +
 +    /**
 +     * @param sstables
 +     * @return true if all desired references were acquired.  Otherwise, it will unreference any partial acquisition, and return false.
 +     */
 +    public static boolean acquireReferences(Iterable<SSTableReader> sstables)
 +    {
 +        SSTableReader failed = null;
 +        for (SSTableReader sstable : sstables)
 +        {
 +            if (!sstable.acquireReference())
 +            {
 +                failed = sstable;
 +                break;
 +            }
 +        }
 +
 +        if (failed == null)
 +            return true;
 +
 +        for (SSTableReader sstable : sstables)
 +        {
 +            if (sstable == failed)
 +                break;
 +            sstable.releaseReference();
 +        }
 +        return false;
 +    }
 +
 +    public static void releaseReferences(Iterable<SSTableReader> sstables)
 +    {
 +        for (SSTableReader sstable : sstables)
 +        {
 +            sstable.releaseReference();
 +        }
 +    }
 +
 +    private void dropPageCache()
 +    {
 +        dropPageCache(dfile.path);
 +        dropPageCache(ifile.path);
 +    }
 +
 +    private void dropPageCache(String filePath)
 +    {
 +        RandomAccessFile file = null;
 +
 +        try
 +        {
 +            file = new RandomAccessFile(filePath, "r");
 +
 +            int fd = CLibrary.getfd(file.getFD());
 +
 +            if (fd > 0)
 +            {
 +                if (logger.isDebugEnabled())
 +                    logger.debug(String.format("Dropping page cache of file %s.", filePath));
 +
 +                CLibrary.trySkipCache(fd, 0, 0);
 +            }
 +        }
 +        catch (IOException e)
 +        {
 +            // we don't care if cache cleanup fails
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(file);
 +        }
 +    }
 +
 +    /**
 +     * Increment the total row read count and read rate for this SSTable.  This should not be incremented for range
 +     * slice queries, row cache hits, or non-query reads, like compaction.
 +     */
 +    public void incrementReadCount()
 +    {
 +        if (readMeter != null)
 +            readMeter.mark();
 +    }
 +
 +    public static class SizeComparator implements Comparator<SSTableReader>
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
 +        }
 +    }
 +
 +    public static abstract class Factory
 +    {
 +        public abstract SSTableReader open(final Descriptor descriptor,
 +                                           Set<Component> components,
 +                                           CFMetaData metadata,
 +                                           IPartitioner partitioner,
 +                                           Long maxDataAge,
 +                                           StatsMetadata sstableMetadata,
 +                                           OpenReason openReason);
 +
 +    }
 +}


[2/3] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Posted by be...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index c017530,0000000..32b5520
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@@ -1,202 -1,0 +1,215 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.io.sstable.format;
 +
 +import com.google.common.collect.Sets;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.config.Schema;
 +import org.apache.cassandra.db.ColumnFamily;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.RowIndexEntry;
 +import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.SSTable;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.utils.Pair;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.DataInput;
 +import java.io.IOException;
 +import java.util.Arrays;
 +import java.util.HashSet;
 +import java.util.Set;
 +
 +/**
 + * This is the API all table writers must implement.
 + *
 + * TableWriter.create() is the primary way to create a writer for a particular format.
 + * The format information is part of the Descriptor.
 + */
 +public abstract class SSTableWriter extends SSTable
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(SSTableWriter.class);
 +
++    public static enum FinishType
++    {
++        NORMAL(SSTableReader.OpenReason.NORMAL),
++        EARLY(SSTableReader.OpenReason.EARLY), // no renaming
++        FINISH_EARLY(SSTableReader.OpenReason.NORMAL); // tidy up an EARLY finish
++        public final SSTableReader.OpenReason openReason;
++
++        FinishType(SSTableReader.OpenReason openReason)
++        {
++            this.openReason = openReason;
++        }
++    }
++
 +    protected final long repairedAt;
 +    protected final long keyCount;
 +    protected final MetadataCollector metadataCollector;
 +    protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
 +
 +    protected SSTableWriter(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector)
 +    {
 +        super(descriptor, components(metadata), metadata, partitioner);
 +        this.keyCount = keyCount;
 +        this.repairedAt = repairedAt;
 +        this.metadataCollector = metadataCollector;
 +        this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata);
 +    }
 +
 +    public static SSTableWriter create(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata,  IPartitioner partitioner, MetadataCollector metadataCollector)
 +    {
 +        Factory writerFactory = descriptor.getFormat().getWriterFactory();
 +        return writerFactory.open(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector);
 +    }
 +
 +    public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt)
 +    {
 +        return create(descriptor, keyCount, repairedAt, 0);
 +    }
 +
 +    public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel)
 +    {
 +        CFMetaData metadata = Schema.instance.getCFMetaData(descriptor);
 +        MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
 +
 +        return create(descriptor, keyCount, repairedAt, metadata, DatabaseDescriptor.getPartitioner(), collector);
 +    }
 +
 +    public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel)
 +    {
 +        return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel);
 +    }
 +
 +    public static SSTableWriter create(String filename, long keyCount, long repairedAt)
 +    {
 +        return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 0);
 +    }
 +
 +    private static Set<Component> components(CFMetaData metadata)
 +    {
 +        Set<Component> components = new HashSet<Component>(Arrays.asList(Component.DATA,
 +                Component.PRIMARY_INDEX,
 +                Component.STATS,
 +                Component.SUMMARY,
 +                Component.TOC,
 +                Component.DIGEST));
 +
 +        if (metadata.getBloomFilterFpChance() < 1.0)
 +            components.add(Component.FILTER);
 +
 +        if (metadata.compressionParameters().sstableCompressor != null)
 +        {
 +            components.add(Component.COMPRESSION_INFO);
 +        }
 +        else
 +        {
 +            // it would feel safer to actually add this component later in maybeWriteDigest(),
 +            // but the components are unmodifiable after construction
 +            components.add(Component.CRC);
 +        }
 +        return components;
 +    }
 +
 +
 +    public abstract void mark();
 +
 +
 +    /**
 +     * @param row
 +     * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row
 +     */
 +    public abstract RowIndexEntry append(AbstractCompactedRow row);
 +
 +    public abstract void append(DecoratedKey decoratedKey, ColumnFamily cf);
 +
 +    public abstract long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Version version) throws IOException;
 +
 +    public abstract long getFilePointer();
 +
 +    public abstract long getOnDiskFilePointer();
 +
 +    public abstract void isolateReferences();
 +
 +    public abstract void resetAndTruncate();
 +
 +    public SSTableReader closeAndOpenReader()
 +    {
 +        return closeAndOpenReader(System.currentTimeMillis());
 +    }
 +
 +    public SSTableReader closeAndOpenReader(long maxDataAge)
 +    {
-         return closeAndOpenReader(maxDataAge, repairedAt);
++        return finish(FinishType.NORMAL, maxDataAge, repairedAt);
 +    }
 +
-     public abstract SSTableReader closeAndOpenReader(long maxDataAge, long repairedAt);
++    public abstract SSTableReader finish(FinishType finishType, long maxDataAge, long repairedAt);
 +
 +    public abstract SSTableReader openEarly(long maxDataAge);
 +
 +    // Close the writer and return the descriptor to the new sstable and it's metadata
 +    public abstract Pair<Descriptor, StatsMetadata> close();
 +
 +
 +
 +    public static Descriptor rename(Descriptor tmpdesc, Set<Component> components)
 +    {
 +        Descriptor newdesc = tmpdesc.asType(Descriptor.Type.FINAL);
 +        rename(tmpdesc, newdesc, components);
 +        return newdesc;
 +    }
 +
 +    public static void rename(Descriptor tmpdesc, Descriptor newdesc, Set<Component> components)
 +    {
 +        for (Component component : Sets.difference(components, Sets.newHashSet(Component.DATA, Component.SUMMARY)))
 +        {
 +            FileUtils.renameWithConfirm(tmpdesc.filenameFor(component), newdesc.filenameFor(component));
 +        }
 +
 +        // do -Data last because -Data present should mean the sstable was completely renamed before crash
 +        FileUtils.renameWithConfirm(tmpdesc.filenameFor(Component.DATA), newdesc.filenameFor(Component.DATA));
 +
 +        // rename it without confirmation because summary can be available for loadNewSSTables but not for closeAndOpenReader
 +        FileUtils.renameWithOutConfirm(tmpdesc.filenameFor(Component.SUMMARY), newdesc.filenameFor(Component.SUMMARY));
 +    }
 +
 +
 +    /**
 +     * After failure, attempt to close the index writer and data file before deleting all temp components for the sstable
 +     */
 +    public void abort()
 +    {
 +        abort(true);
 +    }
 +
 +    public abstract void abort(boolean closeBf);
 +
 +
 +    public static abstract class Factory
 +    {
 +        public abstract SSTableWriter open(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 7c68c8a,0000000..c52184b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@@ -1,557 -1,0 +1,588 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format.big;
 +
 +import java.io.Closeable;
 +import java.io.DataInput;
 +import java.io.File;
 +import java.io.FileOutputStream;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.Collections;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.io.sstable.*;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
 +import org.apache.cassandra.io.sstable.format.Version;
 +import org.apache.cassandra.io.util.*;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.io.FSWriteError;
 +import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 +import org.apache.cassandra.io.sstable.metadata.MetadataType;
 +import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
 +import org.apache.cassandra.io.util.FileMark;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.io.util.SegmentedFile;
 +import org.apache.cassandra.io.util.SequentialWriter;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.FilterFactory;
 +import org.apache.cassandra.utils.IFilter;
 +import org.apache.cassandra.utils.Pair;
 +import org.apache.cassandra.utils.StreamingHistogram;
 +
 +public class BigTableWriter extends SSTableWriter
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(BigTableWriter.class);
 +
 +    // not very random, but the only value that can't be mistaken for a legal column-name length
 +    public static final int END_OF_ROW = 0x0000;
 +
 +    private IndexWriter iwriter;
 +    private SegmentedFile.Builder dbuilder;
 +    private final SequentialWriter dataFile;
 +    private DecoratedKey lastWrittenKey;
 +    private FileMark dataMark;
 +
 +    BigTableWriter(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector)
 +    {
 +        super(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector);
 +
 +        iwriter = new IndexWriter(keyCount);
 +
 +        if (compression)
 +        {
 +            dataFile = SequentialWriter.open(getFilename(),
 +                                             descriptor.filenameFor(Component.COMPRESSION_INFO),
 +                                             metadata.compressionParameters(),
 +                                             metadataCollector);
 +            dbuilder = SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile);
 +        }
 +        else
 +        {
 +            dataFile = SequentialWriter.open(new File(getFilename()), new File(descriptor.filenameFor(Component.CRC)));
 +            dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 +        }
 +    }
 +
 +    public void mark()
 +    {
 +        dataMark = dataFile.mark();
 +        iwriter.mark();
 +    }
 +
 +    public void resetAndTruncate()
 +    {
 +        dataFile.resetAndTruncate(dataMark);
 +        iwriter.resetAndTruncate();
 +    }
 +
 +    /**
 +     * Perform sanity checks on @param decoratedKey and @return the position in the data file before any data is written
 +     */
 +    private long beforeAppend(DecoratedKey decoratedKey)
 +    {
 +        assert decoratedKey != null : "Keys must not be null"; // empty keys ARE allowed b/c of indexed column values
 +        if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) >= 0)
 +            throw new RuntimeException("Last written key " + lastWrittenKey + " >= current key " + decoratedKey + " writing into " + getFilename());
 +        return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
 +    }
 +
 +    private void afterAppend(DecoratedKey decoratedKey, long dataPosition, RowIndexEntry index)
 +    {
 +        metadataCollector.addKey(decoratedKey.getKey());
 +        lastWrittenKey = decoratedKey;
 +        last = lastWrittenKey;
 +        if (first == null)
 +            first = lastWrittenKey;
 +
 +        if (logger.isTraceEnabled())
 +            logger.trace("wrote {} at {}", decoratedKey, dataPosition);
 +        iwriter.append(decoratedKey, index);
 +        dbuilder.addPotentialBoundary(dataPosition);
 +    }
 +
 +    /**
 +     * @param row
 +     * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row
 +     */
 +    public RowIndexEntry append(AbstractCompactedRow row)
 +    {
 +        long currentPosition = beforeAppend(row.key);
 +        RowIndexEntry entry;
 +        try
 +        {
 +            entry = row.write(currentPosition, dataFile);
 +            if (entry == null)
 +                return null;
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, dataFile.getPath());
 +        }
 +        metadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats());
 +        afterAppend(row.key, currentPosition, entry);
 +        return entry;
 +    }
 +
 +    public void append(DecoratedKey decoratedKey, ColumnFamily cf)
 +    {
 +        if (decoratedKey.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
 +        {
 +            logger.error("Key size {} exceeds maximum of {}, skipping row",
 +                         decoratedKey.getKey().remaining(),
 +                         FBUtilities.MAX_UNSIGNED_SHORT);
 +            return;
 +        }
 +
 +        long startPosition = beforeAppend(decoratedKey);
 +        try
 +        {
 +            RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, dataFile.stream);
 +            afterAppend(decoratedKey, startPosition, entry);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, dataFile.getPath());
 +        }
 +        metadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats());
 +    }
 +
 +    private static RowIndexEntry rawAppend(ColumnFamily cf, long startPosition, DecoratedKey key, DataOutputPlus out) throws IOException
 +    {
 +        assert cf.hasColumns() || cf.isMarkedForDelete();
 +
 +        ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, key.getKey(), out);
 +        ColumnIndex index = builder.build(cf);
 +
 +        out.writeShort(END_OF_ROW);
 +        return RowIndexEntry.create(startPosition, cf.deletionInfo().getTopLevelDeletion(), index);
 +    }
 +
 +    /**
 +     * @throws IOException if a read from the DataInput fails
 +     * @throws FSWriteError if a write to the dataFile fails
 +     */
 +    public long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Version version) throws IOException
 +    {
 +        long currentPosition = beforeAppend(key);
 +
 +        ColumnStats.MaxLongTracker maxTimestampTracker = new ColumnStats.MaxLongTracker(Long.MAX_VALUE);
 +        ColumnStats.MinLongTracker minTimestampTracker = new ColumnStats.MinLongTracker(Long.MIN_VALUE);
 +        ColumnStats.MaxIntTracker maxDeletionTimeTracker = new ColumnStats.MaxIntTracker(Integer.MAX_VALUE);
 +        List<ByteBuffer> minColumnNames = Collections.emptyList();
 +        List<ByteBuffer> maxColumnNames = Collections.emptyList();
 +        StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
 +        boolean hasLegacyCounterShards = false;
 +
 +        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
 +        cf.delete(DeletionTime.serializer.deserialize(in));
 +
 +        ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.getKey(), dataFile.stream);
 +
 +        if (cf.deletionInfo().getTopLevelDeletion().localDeletionTime < Integer.MAX_VALUE)
 +        {
 +            tombstones.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
 +            maxDeletionTimeTracker.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
 +            minTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
 +            maxTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
 +        }
 +
 +        Iterator<RangeTombstone> rangeTombstoneIterator = cf.deletionInfo().rangeIterator();
 +        while (rangeTombstoneIterator.hasNext())
 +        {
 +            RangeTombstone rangeTombstone = rangeTombstoneIterator.next();
 +            tombstones.update(rangeTombstone.getLocalDeletionTime());
 +            minTimestampTracker.update(rangeTombstone.timestamp());
 +            maxTimestampTracker.update(rangeTombstone.timestamp());
 +            maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime());
 +            minColumnNames = ColumnNameHelper.minComponents(minColumnNames, rangeTombstone.min, metadata.comparator);
 +            maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, rangeTombstone.max, metadata.comparator);
 +        }
 +
 +        Iterator<OnDiskAtom> iter = AbstractCell.onDiskIterator(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version, metadata.comparator);
 +        try
 +        {
 +            while (iter.hasNext())
 +            {
 +                OnDiskAtom atom = iter.next();
 +                if (atom == null)
 +                    break;
 +
 +                if (atom instanceof CounterCell)
 +                {
 +                    atom = ((CounterCell) atom).markLocalToBeCleared();
 +                    hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) atom).hasLegacyShards();
 +                }
 +
 +                int deletionTime = atom.getLocalDeletionTime();
 +                if (deletionTime < Integer.MAX_VALUE)
 +                    tombstones.update(deletionTime);
 +                minTimestampTracker.update(atom.timestamp());
 +                maxTimestampTracker.update(atom.timestamp());
 +                minColumnNames = ColumnNameHelper.minComponents(minColumnNames, atom.name(), metadata.comparator);
 +                maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, atom.name(), metadata.comparator);
 +                maxDeletionTimeTracker.update(atom.getLocalDeletionTime());
 +
 +                columnIndexer.add(atom); // This write the atom on disk too
 +            }
 +
 +            columnIndexer.maybeWriteEmptyRowHeader();
 +            dataFile.stream.writeShort(END_OF_ROW);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, dataFile.getPath());
 +        }
 +
 +        metadataCollector.updateMinTimestamp(minTimestampTracker.get())
 +                         .updateMaxTimestamp(maxTimestampTracker.get())
 +                         .updateMaxLocalDeletionTime(maxDeletionTimeTracker.get())
 +                         .addRowSize(dataFile.getFilePointer() - currentPosition)
 +                         .addColumnCount(columnIndexer.writtenAtomCount())
 +                         .mergeTombstoneHistogram(tombstones)
 +                         .updateMinColumnNames(minColumnNames)
 +                         .updateMaxColumnNames(maxColumnNames)
 +                         .updateHasLegacyCounterShards(hasLegacyCounterShards);
 +
 +        afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(), columnIndexer.build()));
 +        return currentPosition;
 +    }
 +
 +    /**
 +     * After failure, attempt to close the index writer and data file before deleting all temp components for the sstable
 +     */
 +    public void abort(boolean closeBf)
 +    {
 +        assert descriptor.type.isTemporary;
 +        if (iwriter == null && dataFile == null)
 +            return;
 +        if (iwriter != null)
 +        {
 +            FileUtils.closeQuietly(iwriter.indexFile);
 +            if (closeBf)
 +            {
 +                iwriter.bf.close();
 +            }
 +        }
 +        if (dataFile!= null)
 +            FileUtils.closeQuietly(dataFile);
 +
 +        Set<Component> components = SSTable.componentsFor(descriptor);
 +        try
 +        {
 +            if (!components.isEmpty())
 +                SSTable.delete(descriptor, components);
 +        }
 +        catch (FSWriteError e)
 +        {
 +            logger.error(String.format("Failed deleting temp components for %s", descriptor), e);
 +            throw e;
 +        }
 +    }
 +
 +    // we use this method to ensure any managed data we may have retained references to during the write are no
 +    // longer referenced, so that we do not need to enclose the expensive call to closeAndOpenReader() in a transaction
 +    public void isolateReferences()
 +    {
 +        // currently we only maintain references to first/last/lastWrittenKey from the data provided; all other
 +        // data retention is done through copying
 +        first = getMinimalKey(first);
 +        last = lastWrittenKey = getMinimalKey(last);
 +    }
 +
++    private Descriptor makeTmpLinks()
++    {
++        // create temp links if they don't already exist
++        Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
++        if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
++        {
++            FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX)));
++            FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA)));
++        }
++        return link;
++    }
++
 +    public SSTableReader openEarly(long maxDataAge)
 +    {
 +        StatsMetadata sstableMetadata = (StatsMetadata) metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
 +                                                  metadata.getBloomFilterFpChance(),
 +                                                  repairedAt).get(MetadataType.STATS);
 +
 +        // find the max (exclusive) readable key
 +        DecoratedKey exclusiveUpperBoundOfReadableIndex = iwriter.getMaxReadableKey(0);
 +        if (exclusiveUpperBoundOfReadableIndex == null)
 +            return null;
 +
-         // create temp links if they don't already exist
-         Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
-         if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
-         {
-             FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX)));
-             FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA)));
-         }
- 
++        Descriptor link = makeTmpLinks();
 +        // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers
-         SegmentedFile ifile = iwriter.builder.openEarly(link.filenameFor(Component.PRIMARY_INDEX));
-         SegmentedFile dfile = dbuilder.openEarly(link.filenameFor(Component.DATA));
++        SegmentedFile ifile = iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), FinishType.EARLY);
++        SegmentedFile dfile = dbuilder.complete(link.filenameFor(Component.DATA), FinishType.EARLY);
 +        SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
 +                                                           components, metadata,
 +                                                           partitioner, ifile,
 +                                                           dfile, iwriter.summary.build(partitioner, exclusiveUpperBoundOfReadableIndex),
 +                                                           iwriter.bf, maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY);
 +
 +        // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
 +        sstable.first = getMinimalKey(first);
 +        sstable.last = getMinimalKey(exclusiveUpperBoundOfReadableIndex);
 +        DecoratedKey inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(1);
 +        if (inclusiveUpperBoundOfReadableData == null)
 +        {
 +            // Prevent leaving tmplink files on disk
 +            sstable.releaseReference();
 +            return null;
 +        }
 +        int offset = 2;
 +        while (true)
 +        {
 +            RowIndexEntry indexEntry = sstable.getPosition(inclusiveUpperBoundOfReadableData, SSTableReader.Operator.GT);
 +            if (indexEntry != null && indexEntry.position <= dataFile.getLastFlushOffset())
 +                break;
 +            inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(offset++);
 +            if (inclusiveUpperBoundOfReadableData == null)
 +            {
 +                sstable.releaseReference();
 +                return null;
 +            }
 +        }
 +        sstable.last = getMinimalKey(inclusiveUpperBoundOfReadableData);
 +        return sstable;
 +    }
 +
-     public SSTableReader closeAndOpenReader(long maxDataAge, long repairedAt)
++    public SSTableReader closeAndOpenReader()
 +    {
-         Pair<Descriptor, StatsMetadata> p = close(repairedAt);
-         Descriptor newdesc = p.left;
-         StatsMetadata sstableMetadata = p.right;
++        return closeAndOpenReader(System.currentTimeMillis());
++    }
++
++    public SSTableReader closeAndOpenReader(long maxDataAge)
++    {
++        return finish(FinishType.NORMAL, maxDataAge, this.repairedAt);
++    }
++
++    public SSTableReader finish(FinishType finishType, long maxDataAge, long repairedAt)
++    {
++        Pair<Descriptor, StatsMetadata> p;
++
++        p = close(finishType, repairedAt);
++        Descriptor desc = p.left;
++        StatsMetadata metadata = p.right;
++
++        if (finishType == FinishType.EARLY)
++            desc = makeTmpLinks();
 +
 +        // finalize in-memory state for the reader
-         SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(Component.PRIMARY_INDEX));
-         SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(Component.DATA));
-         SSTableReader sstable = SSTableReader.internalOpen(newdesc,
++        SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX), finishType);
++        SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA), finishType);
++        SSTableReader sstable = SSTableReader.internalOpen(desc.asType(Descriptor.Type.FINAL),
 +                                                           components,
-                                                            metadata,
++                                                           this.metadata,
 +                                                           partitioner,
 +                                                           ifile,
 +                                                           dfile,
 +                                                           iwriter.summary.build(partitioner),
 +                                                           iwriter.bf,
 +                                                           maxDataAge,
-                                                            sstableMetadata,
-                                                            SSTableReader.OpenReason.NORMAL);
++                                                           metadata,
++                                                           finishType.openReason);
 +        sstable.first = getMinimalKey(first);
 +        sstable.last = getMinimalKey(last);
-         // try to save the summaries to disk
-         sstable.saveSummary(iwriter.builder, dbuilder);
-         iwriter = null;
-         dbuilder = null;
++
++        switch (finishType)
++        {
++            case NORMAL: case FINISH_EARLY:
++            // try to save the summaries to disk
++            sstable.saveSummary(iwriter.builder, dbuilder);
++            iwriter = null;
++            dbuilder = null;
++        }
 +        return sstable;
 +    }
 +
 +    // Close the writer and return the descriptor to the new sstable and it's metadata
 +    public Pair<Descriptor, StatsMetadata> close()
 +    {
-         return close(this.repairedAt);
++        return close(FinishType.NORMAL, this.repairedAt);
 +    }
 +
-     private Pair<Descriptor, StatsMetadata> close(long repairedAt)
++    private Pair<Descriptor, StatsMetadata> close(FinishType type, long repairedAt)
 +    {
++        switch (type)
++        {
++            case EARLY: case NORMAL:
++            iwriter.close();
++            dataFile.close();
++        }
 +
-         // index and filter
-         iwriter.close();
-         // main data, close will truncate if necessary
-         dataFile.close();
-         dataFile.writeFullChecksum(descriptor);
 +        // write sstable statistics
-         Map<MetadataType, MetadataComponent> metadataComponents = metadataCollector.finalizeMetadata(
-                                                                                     partitioner.getClass().getCanonicalName(),
-                                                                                     metadata.getBloomFilterFpChance(),
-                                                                                     repairedAt);
-         writeMetadata(descriptor, metadataComponents);
- 
-         // save the table of components
-         SSTable.appendTOC(descriptor, components);
++        Map<MetadataType, MetadataComponent> metadataComponents;
++        metadataComponents = metadataCollector
++                             .finalizeMetadata(partitioner.getClass().getCanonicalName(),
++                                               metadata.getBloomFilterFpChance(),repairedAt);
 +
 +        // remove the 'tmp' marker from all components
-         return Pair.create(SSTableWriter.rename(descriptor, components), (StatsMetadata) metadataComponents.get(MetadataType.STATS));
++        Descriptor descriptor = this.descriptor;
++        switch (type)
++        {
++            case NORMAL: case FINISH_EARLY:
++            dataFile.writeFullChecksum(descriptor);
++            writeMetadata(descriptor, metadataComponents);
++            // save the table of components
++            SSTable.appendTOC(descriptor, components);
++            descriptor = rename(descriptor, components);
++        }
 +
++        return Pair.create(descriptor, (StatsMetadata) metadataComponents.get(MetadataType.STATS));
 +    }
 +
- 
 +    private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components)
 +    {
 +        SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)));
 +        try
 +        {
 +            desc.getMetadataSerializer().serialize(components, out.stream);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, out.getPath());
 +        }
 +        finally
 +        {
 +            out.close();
 +        }
 +    }
 +
 +    public long getFilePointer()
 +    {
 +        return dataFile.getFilePointer();
 +    }
 +
 +    public long getOnDiskFilePointer()
 +    {
 +        return dataFile.getOnDiskFilePointer();
 +    }
 +
 +    /**
 +     * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
 +     */
 +    class IndexWriter implements Closeable
 +    {
 +        private final SequentialWriter indexFile;
 +        public final SegmentedFile.Builder builder;
 +        public final IndexSummaryBuilder summary;
 +        public final IFilter bf;
 +        private FileMark mark;
 +
 +        IndexWriter(long keyCount)
 +        {
 +            indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 +            builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
 +            summary = new IndexSummaryBuilder(keyCount, metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL);
 +            bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true);
 +        }
 +
 +        // finds the last (-offset) decorated key that can be guaranteed to occur fully in the flushed portion of the index file
 +        DecoratedKey getMaxReadableKey(int offset)
 +        {
 +            long maxIndexLength = indexFile.getLastFlushOffset();
 +            return summary.getMaxReadableKey(maxIndexLength, offset);
 +        }
 +
 +        public void append(DecoratedKey key, RowIndexEntry indexEntry)
 +        {
 +            bf.add(key.getKey());
 +            long indexPosition = indexFile.getFilePointer();
 +            try
 +            {
 +                ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile.stream);
 +                rowIndexEntrySerializer.serialize(indexEntry, indexFile.stream);
 +            }
 +            catch (IOException e)
 +            {
 +                throw new FSWriteError(e, indexFile.getPath());
 +            }
 +
 +            if (logger.isTraceEnabled())
 +                logger.trace("wrote index entry: {} at {}", indexEntry, indexPosition);
 +
 +            summary.maybeAddEntry(key, indexPosition);
 +            builder.addPotentialBoundary(indexPosition);
 +        }
 +
 +        /**
 +         * Closes the index and bloomfilter, making the public state of this writer valid for consumption.
 +         */
 +        public void close()
 +        {
 +            if (components.contains(Component.FILTER))
 +            {
 +                String path = descriptor.filenameFor(Component.FILTER);
 +                try
 +                {
 +                    // bloom filter
 +                    FileOutputStream fos = new FileOutputStream(path);
 +                    DataOutputStreamAndChannel stream = new DataOutputStreamAndChannel(fos);
 +                    FilterFactory.serialize(bf, stream);
 +                    stream.flush();
 +                    fos.getFD().sync();
 +                    stream.close();
 +                }
 +                catch (IOException e)
 +                {
 +                    throw new FSWriteError(e, path);
 +                }
 +            }
 +
 +            // index
 +            long position = indexFile.getFilePointer();
 +            indexFile.close(); // calls force
 +            FileUtils.truncate(indexFile.getPath(), position);
 +        }
 +
 +        public void mark()
 +        {
 +            mark = indexFile.mark();
 +        }
 +
 +        public void resetAndTruncate()
 +        {
 +            // we can't un-set the bloom filter addition, but extra keys in there are harmless.
 +            // we can't reset dbuilder either, but that is the last thing called in afterappend so
 +            // we assume that if that worked then we won't be trying to reset.
 +            indexFile.resetAndTruncate(mark);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
index b284f61,57f465f..7679e30
--- a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
@@@ -19,6 -19,8 +19,8 @@@ package org.apache.cassandra.io.util
  
  import java.io.File;
  
 -import org.apache.cassandra.io.sstable.SSTableWriter;
++import org.apache.cassandra.io.sstable.format.SSTableWriter;
+ 
  public class BufferedPoolingSegmentedFile extends PoolingSegmentedFile
  {
      public BufferedPoolingSegmentedFile(String path, long length)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
index aa031e3,2f715da..325dc27
--- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
@@@ -19,6 -19,8 +19,8 @@@ package org.apache.cassandra.io.util
  
  import java.io.File;
  
 -import org.apache.cassandra.io.sstable.SSTableWriter;
++import org.apache.cassandra.io.sstable.format.SSTableWriter;
+ 
  public class BufferedSegmentedFile extends SegmentedFile
  {
      public BufferedSegmentedFile(String path, long length)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
index 1803e69,11d091a..489a2b5
--- a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
@@@ -20,6 -20,7 +20,7 @@@ package org.apache.cassandra.io.util
  import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
  import org.apache.cassandra.io.compress.CompressedSequentialWriter;
  import org.apache.cassandra.io.compress.CompressionMetadata;
 -import org.apache.cassandra.io.sstable.SSTableWriter;
++import org.apache.cassandra.io.sstable.format.SSTableWriter;
  
  public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile implements ICompressedFile
  {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index 4afe0a0,b788715..e0f50c2
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@@ -20,6 -20,7 +20,7 @@@ package org.apache.cassandra.io.util
  import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
  import org.apache.cassandra.io.compress.CompressedSequentialWriter;
  import org.apache.cassandra.io.compress.CompressionMetadata;
 -import org.apache.cassandra.io.sstable.SSTableWriter;
++import org.apache.cassandra.io.sstable.format.SSTableWriter;
  
  public class CompressedSegmentedFile extends SegmentedFile implements ICompressedFile
  {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index ccc03fc,3b2cc98..bf120a3
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@@ -28,6 -28,7 +28,7 @@@ import org.slf4j.Logger
  import org.slf4j.LoggerFactory;
  
  import org.apache.cassandra.io.FSReadError;
 -import org.apache.cassandra.io.sstable.SSTableWriter;
++import org.apache.cassandra.io.sstable.format.SSTableWriter;
  import org.apache.cassandra.utils.JVMStabilityInspector;
  
  public class MmappedSegmentedFile extends SegmentedFile

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/SegmentedFile.java
index be549a6,badae56..d3b5bf2
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@@ -29,6 -29,7 +29,7 @@@ import org.apache.cassandra.config.Conf
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.io.FSReadError;
  import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 -import org.apache.cassandra.io.sstable.SSTableWriter;
++import org.apache.cassandra.io.sstable.format.SSTableWriter;
  import org.apache.cassandra.utils.Pair;
  
  /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a51c3c3/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index d5e92dd,392936d..a4ab04f
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@@ -32,12 -30,11 +32,8 @@@ import org.junit.Test
  
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
 -import org.apache.cassandra.db.ArrayBackedSortedColumns;
 -import org.apache.cassandra.db.ColumnFamilyStore;
 -import org.apache.cassandra.db.DecoratedKey;
 -import org.apache.cassandra.db.Keyspace;
 -import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.config.KSMetaData;
- import org.apache.cassandra.db.ArrayBackedSortedColumns;
- import org.apache.cassandra.db.ColumnFamilyStore;
- import org.apache.cassandra.db.DecoratedKey;
- import org.apache.cassandra.db.Keyspace;
- import org.apache.cassandra.db.Mutation;
++import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.compaction.AbstractCompactedRow;
  import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
  import org.apache.cassandra.db.compaction.CompactionController;
@@@ -641,6 -614,73 +635,73 @@@ public class SSTableRewriterTest extend
  
      }
  
+     @Test
+     public void testAllKeysReadable() throws Exception
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+         cfs.truncateBlocking();
+         for (int i = 0; i < 100; i++)
+         {
+             DecoratedKey key = Util.dk(Integer.toString(i));
+             Mutation rm = new Mutation(KEYSPACE, key.getKey());
+             for (int j = 0; j < 10; j++)
+                 rm.add(CF, Util.cellname(Integer.toString(j)), ByteBufferUtil.EMPTY_BYTE_BUFFER, 100);
+             rm.apply();
+         }
+         cfs.forceBlockingFlush();
+         cfs.forceMajorCompaction();
+         validateKeys(keyspace);
+ 
+         assertEquals(1, cfs.getSSTables().size());
+         SSTableReader s = cfs.getSSTables().iterator().next();
+         Set<SSTableReader> compacting = new HashSet<>();
+         compacting.add(s);
+         cfs.getDataTracker().markCompacting(compacting);
+ 
+         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+         SSTableRewriter.overrideOpenInterval(1);
+         SSTableWriter w = getWriter(cfs, s.descriptor.directory);
+         rewriter.switchWriter(w);
+         int keyCount = 0;
 -        try (ICompactionScanner scanner = compacting.iterator().next().getScanner();
++        try (ISSTableScanner scanner = compacting.iterator().next().getScanner();
+              CompactionController controller = new CompactionController(cfs, compacting, 0))
+         {
+             while (scanner.hasNext())
+             {
+                 rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+                 if (keyCount % 10 == 0)
+                 {
+                     rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                 }
+                 keyCount++;
+                 validateKeys(keyspace);
+             }
+             try
+             {
+                 cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, rewriter.finish(), OperationType.COMPACTION);
+                 cfs.getDataTracker().unmarkCompacting(compacting);
+             }
+             catch (Throwable t)
+             {
+                 rewriter.abort();
+             }
+         }
+         validateKeys(keyspace);
+         Thread.sleep(1000);
+         validateCFS(cfs);
+     }
+ 
+     private void validateKeys(Keyspace ks)
+     {
+         for (int i = 0; i < 100; i++)
+         {
+             DecoratedKey key = Util.dk(Integer.toString(i));
+             ColumnFamily cf = Util.getColumnFamily(ks, key, CF);
+             assertTrue(cf != null);
+         }
+     }
+ 
      private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
      {
          ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);