You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/11/03 17:03:37 UTC

[1/5] git commit: Make sure unfinished compaction files are removed.

Repository: cassandra
Updated Branches:
  refs/heads/trunk e60a06cc8 -> 0f59629ce


Make sure unfinished compaction files are removed.

Patch by marcuse; reviewed by yukim for CASSANDRA-8124


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9c316e78
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9c316e78
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9c316e78

Branch: refs/heads/trunk
Commit: 9c316e7858f6dbf9df892aff78431044aa104ed9
Parents: d230922
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Oct 17 14:15:46 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Nov 3 16:17:01 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/io/sstable/SSTableReader.java     |   6 +
 .../cassandra/io/sstable/SSTableRewriter.java   |  90 +++-
 .../io/sstable/SSTableRewriterTest.java         | 473 +++++++++++++++++++
 4 files changed, 555 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c316e78/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 494fb93..681d616 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.2
+ * Make sure unfinished compaction files are removed (CASSANDRA-8124)
  * Fix shutdown when run as Windows service (CASSANDRA-8136)
  * Fix DESCRIBE TABLE with custom indexes (CASSANDRA-8031)
  * Fix race in RecoveryManagerTest (CASSANDRA-8176)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c316e78/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 872f7df..40e708d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1599,6 +1599,12 @@ public class SSTableReader extends SSTable
         }
     }
 
+    @VisibleForTesting
+    int referenceCount()
+    {
+        return references.get();
+    }
+
     /**
      * Release reference to this SSTableReader.
      * If there is no one referring to this SSTable, and is marked as compacted,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c316e78/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 76677ac..2c9fe7e 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -25,8 +25,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
+import com.google.common.collect.Lists;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -37,6 +39,7 @@ import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * Wraps one or more writers as output for rewriting one or more readers: every sstable_preemptive_open_interval_in_mb
@@ -55,8 +58,7 @@ import org.apache.cassandra.utils.FBUtilities;
  */
 public class SSTableRewriter
 {
-
-    private static final long preemptiveOpenInterval;
+    private static long preemptiveOpenInterval;
     static
     {
         long interval = DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20);
@@ -65,6 +67,14 @@ public class SSTableRewriter
         preemptiveOpenInterval = interval;
     }
 
+    private boolean isFinished = false;
+
+    @VisibleForTesting
+    static void overrideOpenInterval(long size)
+    {
+        preemptiveOpenInterval = size;
+    }
+
     private final DataTracker dataTracker;
     private final ColumnFamilyStore cfs;
 
@@ -77,6 +87,8 @@ public class SSTableRewriter
     private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at
 
     private final List<SSTableReader> finished = new ArrayList<>(); // the resultant sstables
+    private final List<SSTableReader> finishedOpenedEarly = new ArrayList<>(); // the 'finished' tmplink sstables
+    private final List<Pair<SSTableWriter, SSTableReader>> finishedWriters = new ArrayList<>();
     private final OperationType rewriteType; // the type of rewrite/compaction being performed
     private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of DataTracker)
 
@@ -187,20 +199,32 @@ public class SSTableRewriter
     {
         if (writer == null)
             return;
+
+        switchWriter(null);
+
         moveStarts(null, Functions.forMap(originalStarts), true);
-        List<SSTableReader> close = new ArrayList<>(finished);
+
+        List<SSTableReader> close = Lists.newArrayList(finishedOpenedEarly);
         if (currentlyOpenedEarly != null)
             close.add(currentlyOpenedEarly);
+
+        for (Pair<SSTableWriter, SSTableReader> w : finishedWriters)
+        {
+        // we should close the bloom filter if we have not opened an sstable reader from this
+        // writer (it will get closed when we release the sstable reference below):
+            w.left.abort(w.right == null);
+        }
+
         // also remove already completed SSTables
         for (SSTableReader sstable : close)
             sstable.markObsolete();
+
         // releases reference in replaceReaders
         if (!isOffline)
         {
             dataTracker.replaceReaders(close, Collections.<SSTableReader>emptyList(), false);
             dataTracker.unmarkCompacting(close);
         }
-        writer.abort(currentlyOpenedEarly == null);
     }
 
     /**
@@ -208,6 +232,11 @@ public class SSTableRewriter
      * needed, and transferring any key cache entries over to the new reader, expiring them from the old. if reset
      * is true, we are instead restoring the starts of the readers from before the rewriting began
      *
+     * note that we replace an existing sstable with a new *instance* of the same sstable, the replacement
+     * sstable .equals() the old one, BUT, it is a new instance, so, for example, since we releaseReference() on the old
+     * one, the old *instance* will have reference count == 0 and if we were to start a new compaction with that old
+     * instance, we would get exceptions.
+     *
      * @param newReader the rewritten reader that replaces them for this region
      * @param newStarts a function mapping a reader's descriptor to their new start value
      * @param reset true iff we are restoring earlier starts (increasing the range over which they are valid)
@@ -284,11 +313,15 @@ public class SSTableRewriter
             writer = newWriter;
             return;
         }
-        // tmp = false because later we want to query it with descriptor from SSTableReader
-        SSTableReader reader = writer.closeAndOpenReader(maxAge);
-        finished.add(reader);
-        replaceReader(currentlyOpenedEarly, reader, false);
-        moveStarts(reader, Functions.constant(reader.last), false);
+        // we leave it as a tmp file, but we open it early and add it to the dataTracker
+        SSTableReader reader = writer.openEarly(maxAge);
+        if (reader != null)
+        {
+            finishedOpenedEarly.add(reader);
+            replaceReader(currentlyOpenedEarly, reader, false);
+            moveStarts(reader, Functions.constant(reader.last), false);
+        }
+        finishedWriters.add(Pair.create(writer, reader));
         currentlyOpenedEarly = null;
         currentlyOpenedEarlyAt = 0;
         writer = newWriter;
@@ -306,23 +339,48 @@ public class SSTableRewriter
     {
         finish(cleanupOldReaders, -1);
     }
+
+    /**
+     * Finishes the new file(s)
+     *
+     * Creates final files, adds the new files to the dataTracker (via replaceReader) but only marks the
+     * old files as compacted if cleanupOldReaders is set to true. Otherwise it is up to the caller to do those gymnastics
+     * (ie, call DataTracker#markCompactedSSTablesReplaced(..))
+     *
+     * @param cleanupOldReaders if we should replace the old files with the new ones
+     * @param repairedAt the repair time, -1 if we should use the time we supplied when we created
+     *                   the SSTableWriter (and called rewriter.switchWriter(..)), actual time if we want to override the
+     *                   repair time.
+     */
     public void finish(boolean cleanupOldReaders, long repairedAt)
     {
         if (writer.getFilePointer() > 0)
         {
-            SSTableReader reader = repairedAt < 0 ?
-                                    writer.closeAndOpenReader(maxAge) :
-                                    writer.closeAndOpenReader(maxAge, repairedAt);
+            SSTableReader reader = repairedAt < 0 ? writer.closeAndOpenReader(maxAge) : writer.closeAndOpenReader(maxAge, repairedAt);
             finished.add(reader);
             replaceReader(currentlyOpenedEarly, reader, false);
             moveStarts(reader, Functions.constant(reader.last), false);
         }
         else
         {
-            writer.abort();
-            writer = null;
+            writer.abort(true);
+        }
+        // make real sstables of the written ones:
+        for (Pair<SSTableWriter, SSTableReader> w : finishedWriters)
+        {
+            if (w.left.getFilePointer() > 0)
+            {
+                SSTableReader newReader = repairedAt < 0 ? w.left.closeAndOpenReader(maxAge) : w.left.closeAndOpenReader(maxAge, repairedAt);
+                finished.add(newReader);
+                // w.right is the tmplink-reader we added when switching writer, replace with the real sstable.
+                replaceReader(w.right, newReader, false);
+            }
+            else
+            {
+                assert w.right == null;
+                w.left.abort(true);
+            }
         }
-
         if (!isOffline)
         {
             dataTracker.unmarkCompacting(finished);
@@ -337,10 +395,12 @@ public class SSTableRewriter
                 reader.releaseReference();
             }
         }
+        isFinished = true;
     }
 
     public List<SSTableReader> finished()
     {
+        assert isFinished;
         return finished;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c316e78/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
new file mode 100644
index 0000000..8b203ac
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import com.google.common.collect.Sets;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ArrayBackedSortedColumns;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+import org.apache.cassandra.db.compaction.CompactionController;
+import org.apache.cassandra.db.compaction.ICompactionScanner;
+import org.apache.cassandra.db.compaction.LazilyCompactedRow;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SSTableRewriterTest extends SchemaLoader
+{
+    private static final String KEYSPACE = "Keyspace1";
+    private static final String CF = "Standard1";
+    @Test
+    public void basicTest() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+        for (int j = 0; j < 100; j ++)
+        {
+            ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j));
+            Mutation rm = new Mutation(KEYSPACE, key);
+            rm.add(CF, Util.cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j);
+            rm.apply();
+        }
+        cfs.forceBlockingFlush();
+        Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
+        assertEquals(1, sstables.size());
+        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, OperationType.COMPACTION, false);
+        AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);
+        ICompactionScanner scanner = scanners.scanners.get(0);
+        CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
+        writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
+        while(scanner.hasNext())
+        {
+            AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next()));
+            writer.append(row);
+        }
+        writer.finish();
+
+        validateCFS(cfs);
+
+    }
+
+
+    @Test
+    public void testFileRemoval() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+        ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+        for (int i = 0; i < 1000; i++)
+            cf.addColumn(Util.column(String.valueOf(i), "a", 1));
+        File dir = cfs.directories.getDirectoryForNewSSTables();
+        SSTableWriter writer = getWriter(cfs, dir);
+
+        for (int i = 0; i < 500; i++)
+            writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+        SSTableReader s = writer.openEarly(1000);
+        assertFileCounts(dir.list(), 2, 3);
+        for (int i = 500; i < 1000; i++)
+            writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+        SSTableReader s2 = writer.openEarly(1000);
+        assertTrue(s != s2);
+        assertFileCounts(dir.list(), 2, 3);
+        s.markObsolete();
+        s.releaseReference();
+        Thread.sleep(1000);
+        assertFileCounts(dir.list(), 0, 3);
+        writer.abort(false);
+        Thread.sleep(1000);
+        assertFileCounts(dir.list(), 0, 0);
+        validateCFS(cfs);
+    }
+
+    @Test
+    public void testFileRemovalNoAbort() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+        ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+        for (int i = 0; i < 1000; i++)
+            cf.addColumn(Util.column(String.valueOf(i), "a", 1));
+        File dir = cfs.directories.getDirectoryForNewSSTables();
+        SSTableWriter writer = getWriter(cfs, dir);
+
+        for (int i = 0; i < 500; i++)
+            writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+        SSTableReader s = writer.openEarly(1000);
+        //assertFileCounts(dir.list(), 2, 3);
+        for (int i = 500; i < 1000; i++)
+            writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+        writer.closeAndOpenReader();
+        s.markObsolete();
+        s.releaseReference();
+        Thread.sleep(1000);
+        assertFileCounts(dir.list(), 0, 0);
+        validateCFS(cfs);
+    }
+
+
+    @Test
+    public void testNumberOfFiles() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+
+        SSTableReader s = writeFile(cfs, 1000);
+        cfs.addSSTable(s);
+
+        Set<SSTableReader> compacting = Sets.newHashSet(s);
+        SSTableRewriter.overrideOpenInterval(10000000);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+        ICompactionScanner scanner = s.getScanner();
+        CompactionController controller = new CompactionController(cfs, compacting, 0);
+        int files = 1;
+        while(scanner.hasNext())
+        {
+            rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+            if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+            {
+                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                files++;
+                assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+            }
+        }
+        rewriter.finish();
+        assertEquals(files, rewriter.finished().size());
+        assertEquals(files, cfs.getSSTables().size());
+        Thread.sleep(1000);
+        // tmplink and tmp files should be gone:
+        assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        validateCFS(cfs);
+    }
+
+    @Test
+    public void testNumberOfFiles_dont_clean_readers() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+
+        SSTableReader s = writeFile(cfs, 1000);
+        cfs.addSSTable(s);
+
+        Set<SSTableReader> compacting = Sets.newHashSet(s);
+        SSTableRewriter.overrideOpenInterval(10000000);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+        ICompactionScanner scanner = s.getScanner();
+        CompactionController controller = new CompactionController(cfs, compacting, 0);
+        int files = 1;
+        while(scanner.hasNext())
+        {
+            rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+            if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+            {
+                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                files++;
+                assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+            }
+        }
+        rewriter.finish(false);
+        assertEquals(files, rewriter.finished().size());
+        assertEquals(files + 1, cfs.getSSTables().size());
+        cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, rewriter.finished(), OperationType.COMPACTION);
+        assertEquals(files, cfs.getSSTables().size());
+        Thread.sleep(1000);
+        assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        validateCFS(cfs);
+    }
+
+
+    @Test
+    public void testNumberOfFiles_abort() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+
+        SSTableReader s = writeFile(cfs, 1000);
+        cfs.addSSTable(s);
+        DecoratedKey origFirst = s.first;
+        DecoratedKey origLast = s.last;
+        Set<SSTableReader> compacting = Sets.newHashSet(s);
+        SSTableRewriter.overrideOpenInterval(10000000);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+        ICompactionScanner scanner = s.getScanner();
+        CompactionController controller = new CompactionController(cfs, compacting, 0);
+        int files = 1;
+        while(scanner.hasNext())
+        {
+            rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+            if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+            {
+                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                files++;
+                assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+            }
+        }
+        rewriter.abort();
+        Thread.sleep(1000);
+        assertEquals(1, cfs.getSSTables().size());
+        assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
+        assertEquals(cfs.getSSTables().iterator().next().last, origLast);
+        validateCFS(cfs);
+
+    }
+
+    @Test
+    public void testNumberOfFiles_abort2() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+
+        SSTableReader s = writeFile(cfs, 1000);
+        cfs.addSSTable(s);
+
+        DecoratedKey origFirst = s.first;
+        DecoratedKey origLast = s.last;
+        Set<SSTableReader> compacting = Sets.newHashSet(s);
+        SSTableRewriter.overrideOpenInterval(10000000);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+        ICompactionScanner scanner = s.getScanner();
+        CompactionController controller = new CompactionController(cfs, compacting, 0);
+        int files = 1;
+        while(scanner.hasNext())
+        {
+            rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+            if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+            {
+                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                files++;
+                assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+            }
+            if (files == 3)
+            {
+                //testing to abort when we have nothing written in the new file
+                rewriter.abort();
+                break;
+            }
+        }
+        Thread.sleep(1000);
+        assertEquals(1, cfs.getSSTables().size());
+        assertFileCounts(s.descriptor.directory.list(), 0, 0);
+
+        assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
+        assertEquals(cfs.getSSTables().iterator().next().last, origLast);
+        validateCFS(cfs);
+    }
+
+    @Test
+    public void testNumberOfFiles_finish_empty_new_writer() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+
+        SSTableReader s = writeFile(cfs, 1000);
+        cfs.addSSTable(s);
+
+        Set<SSTableReader> compacting = Sets.newHashSet(s);
+        SSTableRewriter.overrideOpenInterval(10000000);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+        ICompactionScanner scanner = s.getScanner();
+        CompactionController controller = new CompactionController(cfs, compacting, 0);
+        int files = 1;
+        while(scanner.hasNext())
+        {
+            rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+            if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+            {
+                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                files++;
+                assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+            }
+            if (files == 3)
+            {
+                //testing to finish when we have nothing written in the new file
+                rewriter.finish();
+                break;
+            }
+        }
+        Thread.sleep(1000);
+        assertEquals(files - 1, cfs.getSSTables().size()); // we never wrote anything to the last file
+        assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        validateCFS(cfs);
+    }
+
+    @Test
+    public void testNumberOfFiles_truncate() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        SSTableReader s = writeFile(cfs, 1000);
+        cfs.addSSTable(s);
+        Set<SSTableReader> compacting = Sets.newHashSet(s);
+        SSTableRewriter.overrideOpenInterval(10000000);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+        ICompactionScanner scanner = s.getScanner();
+        CompactionController controller = new CompactionController(cfs, compacting, 0);
+        int files = 1;
+        while(scanner.hasNext())
+        {
+            rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+            if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+            {
+                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                files++;
+                assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+            }
+        }
+        rewriter.finish();
+        Thread.sleep(1000);
+        assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        cfs.truncateBlocking();
+        validateCFS(cfs);
+    }
+
+    @Test
+    public void testSmallFiles() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        SSTableReader s = writeFile(cfs, 400);
+        DecoratedKey origFirst = s.first;
+        cfs.addSSTable(s);
+        Set<SSTableReader> compacting = Sets.newHashSet(s);
+        SSTableRewriter.overrideOpenInterval(1000000);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+        ICompactionScanner scanner = s.getScanner();
+        CompactionController controller = new CompactionController(cfs, compacting, 0);
+        int files = 1;
+        while(scanner.hasNext())
+        {
+            rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+            if (rewriter.currentWriter().getOnDiskFilePointer() > 2500000)
+            {
+                assertEquals(1, cfs.getSSTables().size()); // we dont open small files early ...
+                assertEquals(origFirst, cfs.getSSTables().iterator().next().first); // ... and the first key should stay the same
+                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                files++;
+            }
+        }
+        rewriter.finish();
+        assertEquals(files, rewriter.finished().size());
+        assertEquals(files, cfs.getSSTables().size());
+        Thread.sleep(1000);
+        assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        validateCFS(cfs);
+    }
+
+    private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
+    {
+        ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+        for (int i = 0; i < count / 100; i++)
+            cf.addColumn(Util.cellname(i), ByteBuffer.allocate(1000), 1);
+        File dir = cfs.directories.getDirectoryForNewSSTables();
+        String filename = cfs.getTempSSTablePath(dir);
+
+        SSTableWriter writer = new SSTableWriter(filename,
+                0,
+                0,
+                cfs.metadata,
+                StorageService.getPartitioner(),
+                new MetadataCollector(cfs.metadata.comparator));
+
+        for (int i = 0; i < count * 5; i++)
+            writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+        return writer.closeAndOpenReader();
+    }
+
+    private void validateCFS(ColumnFamilyStore cfs)
+    {
+        for (SSTableReader sstable : cfs.getSSTables())
+        {
+            assertFalse(sstable.isMarkedCompacted());
+            assertEquals(1, sstable.referenceCount());
+        }
+        assertTrue(cfs.getDataTracker().getCompacting().isEmpty());
+    }
+
+
+    private void assertFileCounts(String [] files, int expectedtmplinkCount, int expectedtmpCount)
+    {
+        int tmplinkcount = 0;
+        int tmpcount = 0;
+        for (String f : files)
+        {
+            if (f.contains("-tmplink-"))
+                tmplinkcount++;
+            if (f.contains("-tmp-"))
+                tmpcount++;
+        }
+        assertEquals(expectedtmplinkCount, tmplinkcount);
+        assertEquals(expectedtmpCount, tmpcount);
+    }
+
+    private SSTableWriter getWriter(ColumnFamilyStore cfs, File directory)
+    {
+        String filename = cfs.getTempSSTablePath(directory);
+        return new SSTableWriter(filename,
+                                 0,
+                                 0,
+                                 cfs.metadata,
+                                 StorageService.getPartitioner(),
+                                 new MetadataCollector(cfs.metadata.comparator));
+    }
+}


[4/5] Merge branch 'cassandra-2.1' into trunk

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index c3062f7,0000000..7d4b8f3
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@@ -1,1881 -1,0 +1,1887 @@@
 +package org.apache.cassandra.io.sstable.format;
 +
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
 +import com.clearspring.analytics.stream.cardinality.ICardinality;
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Predicate;
 +import com.google.common.collect.Iterators;
 +import com.google.common.collect.Ordering;
 +import com.google.common.primitives.Longs;
 +import com.google.common.util.concurrent.RateLimiter;
 +import org.apache.cassandra.cache.CachingOptions;
 +import org.apache.cassandra.cache.InstrumentingCache;
 +import org.apache.cassandra.cache.KeyCacheKey;
 +import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 +import org.apache.cassandra.config.*;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 +import org.apache.cassandra.db.commitlog.ReplayPosition;
 +import org.apache.cassandra.db.compaction.ICompactionScanner;
 +import org.apache.cassandra.db.composites.CellName;
 +import org.apache.cassandra.db.filter.ColumnSlice;
 +import org.apache.cassandra.db.index.SecondaryIndex;
 +import org.apache.cassandra.dht.*;
 +import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 +import org.apache.cassandra.io.compress.CompressedThrottledReader;
 +import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.io.sstable.*;
 +import org.apache.cassandra.io.sstable.metadata.*;
 +import org.apache.cassandra.io.util.*;
 +import org.apache.cassandra.metrics.RestorableMeter;
 +import org.apache.cassandra.metrics.StorageMetrics;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.service.CacheService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.*;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.*;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
 +
 +
 +/**
 + * SSTableReaders are open()ed by Keyspace.onStart; after that they are created by SSTableWriter.renameAndOpen.
 + * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead.
 + */
 +public abstract class SSTableReader extends SSTable
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
 +
 +    private static final ScheduledThreadPoolExecutor syncExecutor = new ScheduledThreadPoolExecutor(1);
 +    private static final RateLimiter meterSyncThrottle = RateLimiter.create(100.0);
 +
 +    public static final Comparator<SSTableReader> maxTimestampComparator = new Comparator<SSTableReader>()
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            long ts1 = o1.getMaxTimestamp();
 +            long ts2 = o2.getMaxTimestamp();
 +            return (ts1 > ts2 ? -1 : (ts1 == ts2 ? 0 : 1));
 +        }
 +    };
 +
 +    public static final Comparator<SSTableReader> sstableComparator = new Comparator<SSTableReader>()
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            return o1.first.compareTo(o2.first);
 +        }
 +    };
 +
 +    public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator);
 +
 +    /**
 +     * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an upper bound
 +     * to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created
 +     * later than maxDataAge.
 +     *
 +     * The field is not serialized to disk, so relying on it for more than what truncate does is not advised.
 +     *
 +     * When a new sstable is flushed, maxDataAge is set to the time of creation.
 +     * When a sstable is created from compaction, maxDataAge is set to max of all merged sstables.
 +     *
 +     * The age is in milliseconds since epoc and is local to this host.
 +     */
 +    public final long maxDataAge;
 +
 +    public enum OpenReason
 +    {
 +        NORMAL,
 +        EARLY,
 +        METADATA_CHANGE
 +    }
 +
 +    public final OpenReason openReason;
 +
 +    // indexfile and datafile: might be null before a call to load()
 +    protected SegmentedFile ifile;
 +    protected SegmentedFile dfile;
 +
 +    protected IndexSummary indexSummary;
 +    protected IFilter bf;
 +
 +    protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
 +
 +    protected InstrumentingCache<KeyCacheKey, RowIndexEntry> keyCache;
 +
 +    protected final BloomFilterTracker bloomFilterTracker = new BloomFilterTracker();
 +
 +    protected final AtomicInteger references = new AtomicInteger(1);
 +    // technically isCompacted is not necessary since it should never be unreferenced unless it is also compacted,
 +    // but it seems like a good extra layer of protection against reference counting bugs to not delete data based on that alone
 +    protected final AtomicBoolean isCompacted = new AtomicBoolean(false);
 +    protected final AtomicBoolean isSuspect = new AtomicBoolean(false);
 +
 +    // not final since we need to be able to change level on a file.
 +    protected volatile StatsMetadata sstableMetadata;
 +
 +    protected final AtomicLong keyCacheHit = new AtomicLong(0);
 +    protected final AtomicLong keyCacheRequest = new AtomicLong(0);
 +
 +    /**
 +     * To support replacing this sstablereader with another object that represents that same underlying sstable, but with different associated resources,
 +     * we build a linked-list chain of replacement, which we synchronise using a shared object to make maintenance of the list across multiple threads simple.
 +     * On close we check if any of the closeable resources differ between any chains either side of us; any that are in neither of the adjacent links (if any) are closed.
 +     * Once we've made this decision we remove ourselves from the linked list, so that anybody behind/ahead will compare against only other still opened resources.
 +     */
 +    protected Object replaceLock = new Object();
 +    protected SSTableReader replacedBy;
 +    private SSTableReader replaces;
 +    private SSTableDeletingTask deletingTask;
 +    private Runnable runOnClose;
 +
 +    @VisibleForTesting
 +    public RestorableMeter readMeter;
 +    protected ScheduledFuture readMeterSyncFuture;
 +
 +    /**
 +     * Calculate approximate key count.
 +     * If cardinality estimator is available on all given sstables, then this method use them to estimate
 +     * key count.
 +     * If not, then this uses index summaries.
 +     *
 +     * @param sstables SSTables to calculate key count
 +     * @return estimated key count
 +     */
 +    public static long getApproximateKeyCount(Collection<SSTableReader> sstables)
 +    {
 +        long count = -1;
 +
 +        // check if cardinality estimator is available for all SSTables
 +        boolean cardinalityAvailable = !sstables.isEmpty() && Iterators.all(sstables.iterator(), new Predicate<SSTableReader>()
 +        {
 +            public boolean apply(SSTableReader sstable)
 +            {
 +                return sstable.descriptor.version.hasNewStatsFile();
 +            }
 +        });
 +
 +        // if it is, load them to estimate key count
 +        if (cardinalityAvailable)
 +        {
 +            boolean failed = false;
 +            ICardinality cardinality = null;
 +            for (SSTableReader sstable : sstables)
 +            {
 +                try
 +                {
 +                    CompactionMetadata metadata = (CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION);
 +                    if (cardinality == null)
 +                        cardinality = metadata.cardinalityEstimator;
 +                    else
 +                        cardinality = cardinality.merge(metadata.cardinalityEstimator);
 +                }
 +                catch (IOException e)
 +                {
 +                    logger.warn("Reading cardinality from Statistics.db failed.", e);
 +                    failed = true;
 +                    break;
 +                }
 +                catch (CardinalityMergeException e)
 +                {
 +                    logger.warn("Cardinality merge failed.", e);
 +                    failed = true;
 +                    break;
 +                }
 +            }
 +            if (cardinality != null && !failed)
 +                count = cardinality.cardinality();
 +        }
 +
 +        // if something went wrong above or cardinality is not available, calculate using index summary
 +        if (count < 0)
 +        {
 +            for (SSTableReader sstable : sstables)
 +                count += sstable.estimatedKeys();
 +        }
 +        return count;
 +    }
 +
 +    public static SSTableReader open(Descriptor descriptor) throws IOException
 +    {
 +        CFMetaData metadata;
 +        if (descriptor.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR))
 +        {
 +            int i = descriptor.cfname.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
 +            String parentName = descriptor.cfname.substring(0, i);
 +            CFMetaData parent = Schema.instance.getCFMetaData(descriptor.ksname, parentName);
 +            ColumnDefinition def = parent.getColumnDefinitionForIndex(descriptor.cfname.substring(i + 1));
 +            metadata = CFMetaData.newIndexMetadata(parent, def, SecondaryIndex.getIndexComparator(parent, def));
 +        }
 +        else
 +        {
 +            metadata = Schema.instance.getCFMetaData(descriptor.ksname, descriptor.cfname);
 +        }
 +        return open(descriptor, metadata);
 +    }
 +
 +    public static SSTableReader open(Descriptor desc, CFMetaData metadata) throws IOException
 +    {
 +        IPartitioner p = desc.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR)
 +                ? new LocalPartitioner(metadata.getKeyValidator())
 +                : StorageService.getPartitioner();
 +        return open(desc, componentsFor(desc), metadata, p);
 +    }
 +
 +    public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
 +    {
 +        return open(descriptor, components, metadata, partitioner, true);
 +    }
 +
 +    public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException
 +    {
 +        return open(descriptor, components, metadata, StorageService.getPartitioner(), false);
 +    }
 +
 +    /**
 +     * Open SSTable reader to be used in batch mode(such as sstableloader).
 +     *
 +     * @param descriptor
 +     * @param components
 +     * @param metadata
 +     * @param partitioner
 +     * @return opened SSTableReader
 +     * @throws IOException
 +     */
 +    public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
 +    {
 +        // Minimum components without which we can't do anything
 +        assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
 +        assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
 +
 +        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
 +                EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
 +        ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
 +        StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
 +
 +        // Check if sstable is created using same partitioner.
 +        // Partitioner can be null, which indicates older version of sstable or no stats available.
 +        // In that case, we skip the check.
 +        String partitionerName = partitioner.getClass().getCanonicalName();
 +        if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
 +        {
 +            logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
 +                    descriptor, validationMetadata.partitioner, partitionerName));
 +            System.exit(1);
 +        }
 +
 +        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
 +        SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(),
 +                statsMetadata, OpenReason.NORMAL);
 +
 +        // special implementation of load to use non-pooled SegmentedFile builders
 +        SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
 +        SegmentedFile.Builder dbuilder = sstable.compression
 +                ? new CompressedSegmentedFile.Builder(null)
 +                : new BufferedSegmentedFile.Builder();
 +        if (!sstable.loadSummary(ibuilder, dbuilder))
 +            sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL);
 +        sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
 +        sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA));
 +        sstable.bf = FilterFactory.AlwaysPresent;
 +
 +        return sstable;
 +    }
 +
 +    private static SSTableReader open(Descriptor descriptor,
 +                                      Set<Component> components,
 +                                      CFMetaData metadata,
 +                                      IPartitioner partitioner,
 +                                      boolean validate) throws IOException
 +    {
 +        // Minimum components without which we can't do anything
 +        assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
 +        assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
 +
 +        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
 +                EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
 +        ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
 +        StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
 +
 +        // Check if sstable is created using same partitioner.
 +        // Partitioner can be null, which indicates older version of sstable or no stats available.
 +        // In that case, we skip the check.
 +        String partitionerName = partitioner.getClass().getCanonicalName();
 +        if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
 +        {
 +            logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
 +                    descriptor, validationMetadata.partitioner, partitionerName));
 +            System.exit(1);
 +        }
 +
 +        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
 +        SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(),
 +                statsMetadata, OpenReason.NORMAL);
 +
 +        // load index and filter
 +        long start = System.nanoTime();
 +        sstable.load(validationMetadata);
 +        logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 +
 +        if (validate)
 +            sstable.validate();
 +
 +        if (sstable.getKeyCache() != null)
 +            logger.debug("key cache contains {}/{} keys", sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity());
 +
 +        return sstable;
 +    }
 +
 +    public static void logOpenException(Descriptor descriptor, IOException e)
 +    {
 +        if (e instanceof FileNotFoundException)
 +            logger.error("Missing sstable component in {}; skipped because of {}", descriptor, e.getMessage());
 +        else
 +            logger.error("Corrupt sstable {}; skipped", descriptor, e);
 +    }
 +
 +    public static Collection<SSTableReader> openAll(Set<Map.Entry<Descriptor, Set<Component>>> entries,
 +                                                    final CFMetaData metadata,
 +                                                    final IPartitioner partitioner)
 +    {
 +        final Collection<SSTableReader> sstables = new LinkedBlockingQueue<>();
 +
 +        ExecutorService executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", FBUtilities.getAvailableProcessors());
 +        for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
 +        {
 +            Runnable runnable = new Runnable()
 +            {
 +                public void run()
 +                {
 +                    SSTableReader sstable;
 +                    try
 +                    {
 +                        sstable = open(entry.getKey(), entry.getValue(), metadata, partitioner);
 +                    }
 +                    catch (IOException ex)
 +                    {
 +                        logger.error("Corrupt sstable {}; skipped", entry, ex);
 +                        return;
 +                    }
 +                    sstables.add(sstable);
 +                }
 +            };
 +            executor.submit(runnable);
 +        }
 +
 +        executor.shutdown();
 +        try
 +        {
 +            executor.awaitTermination(7, TimeUnit.DAYS);
 +        }
 +        catch (InterruptedException e)
 +        {
 +            throw new AssertionError(e);
 +        }
 +
 +        return sstables;
 +
 +    }
 +
 +    /**
 +     * Open a RowIndexedReader which already has its state initialized (by SSTableWriter).
 +     */
 +    public static SSTableReader internalOpen(Descriptor desc,
 +                                      Set<Component> components,
 +                                      CFMetaData metadata,
 +                                      IPartitioner partitioner,
 +                                      SegmentedFile ifile,
 +                                      SegmentedFile dfile,
 +                                      IndexSummary isummary,
 +                                      IFilter bf,
 +                                      long maxDataAge,
 +                                      StatsMetadata sstableMetadata,
 +                                      OpenReason openReason)
 +    {
 +        assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
 +
 +        SSTableReader reader = internalOpen(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
 +
 +        reader.bf = bf;
 +        reader.ifile = ifile;
 +        reader.dfile = dfile;
 +        reader.indexSummary = isummary;
 +
 +        return reader;
 +    }
 +
 +
 +    private static SSTableReader internalOpen(final Descriptor descriptor,
 +                                            Set<Component> components,
 +                                            CFMetaData metadata,
 +                                            IPartitioner partitioner,
 +                                            Long maxDataAge,
 +                                            StatsMetadata sstableMetadata,
 +                                            OpenReason openReason)
 +    {
 +        Factory readerFactory = descriptor.getFormat().getReaderFactory();
 +
 +        return readerFactory.open(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
 +    }
 +
 +    protected SSTableReader(final Descriptor desc,
 +                            Set<Component> components,
 +                            CFMetaData metadata,
 +                            IPartitioner partitioner,
 +                            long maxDataAge,
 +                            StatsMetadata sstableMetadata,
 +                            OpenReason openReason)
 +    {
 +        super(desc, components, metadata, partitioner);
 +        this.sstableMetadata = sstableMetadata;
 +        this.maxDataAge = maxDataAge;
 +        this.openReason = openReason;
 +
 +        this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata);
 +
 +        deletingTask = new SSTableDeletingTask(this);
 +
 +        // Don't track read rates for tables in the system keyspace.  Also don't track reads for special operations (like early open)
 +        // this is to avoid overflowing the executor queue (see CASSANDRA-8066)
 +        if (Keyspace.SYSTEM_KS.equals(desc.ksname) || openReason != OpenReason.NORMAL)
 +        {
 +            readMeter = null;
 +            readMeterSyncFuture = null;
 +            return;
 +        }
 +
 +        readMeter = SystemKeyspace.getSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
 +        // sync the average read rate to system.sstable_activity every five minutes, starting one minute from now
 +        readMeterSyncFuture = syncExecutor.scheduleAtFixedRate(new Runnable()
 +        {
 +            public void run()
 +            {
 +                if (!isCompacted.get())
 +                {
 +                    meterSyncThrottle.acquire();
 +                    SystemKeyspace.persistSSTableReadMeter(desc.ksname, desc.cfname, desc.generation, readMeter);
 +                }
 +            }
 +        }, 1, 5, TimeUnit.MINUTES);
 +    }
 +
 +    public static long getTotalBytes(Iterable<SSTableReader> sstables)
 +    {
 +        long sum = 0;
 +        for (SSTableReader sstable : sstables)
 +        {
 +            sum += sstable.onDiskLength();
 +        }
 +        return sum;
 +    }
 +
 +    private void tidy(boolean release)
 +    {
 +        if (readMeterSyncFuture != null)
 +            readMeterSyncFuture.cancel(false);
 +
 +        if (references.get() != 0)
 +        {
 +            throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)");
 +        }
 +
 +        synchronized (replaceLock)
 +        {
 +            boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = false;
 +
 +            if (replacedBy != null)
 +            {
 +                closeBf = replacedBy.bf != bf;
 +                closeSummary = replacedBy.indexSummary != indexSummary;
 +                closeFiles = replacedBy.dfile != dfile;
 +                // if the replacement sstablereader uses a different path, clean up our paths
 +                deleteFiles = !dfile.path.equals(replacedBy.dfile.path);
 +            }
 +
 +            if (replaces != null)
 +            {
 +                closeBf &= replaces.bf != bf;
 +                closeSummary &= replaces.indexSummary != indexSummary;
 +                closeFiles &= replaces.dfile != dfile;
 +                deleteFiles &= !dfile.path.equals(replaces.dfile.path);
 +            }
 +
 +            boolean deleteAll = false;
 +            if (release && isCompacted.get())
 +            {
 +                assert replacedBy == null;
 +                if (replaces != null)
 +                {
 +                    replaces.replacedBy = null;
 +                    replaces.deletingTask = deletingTask;
 +                    replaces.markObsolete();
 +                }
 +                else
 +                {
 +                    deleteAll = true;
 +                }
 +            }
 +            else
 +            {
 +                if (replaces != null)
 +                    replaces.replacedBy = replacedBy;
 +                if (replacedBy != null)
 +                    replacedBy.replaces = replaces;
 +            }
 +
 +            scheduleTidy(closeBf, closeSummary, closeFiles, deleteFiles, deleteAll);
 +        }
 +    }
 +
 +    private void scheduleTidy(final boolean closeBf, final boolean closeSummary, final boolean closeFiles, final boolean deleteFiles, final boolean deleteAll)
 +    {
 +        if (references.get() != 0)
 +            throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)");
 +
 +        final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
 +        final OpOrder.Barrier barrier;
 +        if (cfs != null)
 +        {
 +            barrier = cfs.readOrdering.newBarrier();
 +            barrier.issue();
 +        }
 +        else
 +            barrier = null;
 +
 +        StorageService.tasks.execute(new Runnable()
 +        {
 +            public void run()
 +            {
 +                if (barrier != null)
 +                    barrier.await();
 +                if (closeBf)
 +                    bf.close();
 +                if (closeSummary)
 +                    indexSummary.close();
 +                if (closeFiles)
 +                {
 +                    ifile.cleanup();
 +                    dfile.cleanup();
 +                }
 +                if (runOnClose != null)
 +                    runOnClose.run();
 +                if (deleteAll)
 +                {
 +                    /**
 +                     * Do the OS a favour and suggest (using fadvice call) that we
 +                     * don't want to see pages of this SSTable in memory anymore.
 +                     *
 +                     * NOTE: We can't use madvice in java because it requires the address of
 +                     * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it
 +                     */
 +                    dropPageCache();
 +                    deletingTask.run();
 +                }
 +                else if (deleteFiles)
 +                {
 +                    FileUtils.deleteWithConfirm(new File(dfile.path));
 +                    FileUtils.deleteWithConfirm(new File(ifile.path));
 +                }
 +            }
 +        });
 +    }
 +
 +    public boolean equals(Object that)
 +    {
 +        return that instanceof SSTableReader && ((SSTableReader) that).descriptor.equals(this.descriptor);
 +    }
 +
 +    public int hashCode()
 +    {
 +        return this.descriptor.hashCode();
 +    }
 +
 +    public String getFilename()
 +    {
 +        return dfile.path;
 +    }
 +
 +    public String getIndexFilename()
 +    {
 +        return ifile.path;
 +    }
 +
 +    public void setTrackedBy(DataTracker tracker)
 +    {
 +        deletingTask.setTracker(tracker);
 +        // under normal operation we can do this at any time, but SSTR is also used outside C* proper,
 +        // e.g. by BulkLoader, which does not initialize the cache.  As a kludge, we set up the cache
 +        // here when we know we're being wired into the rest of the server infrastructure.
 +        keyCache = CacheService.instance.keyCache;
 +    }
 +
 +    private void load(ValidationMetadata validation) throws IOException
 +    {
 +        if (metadata.getBloomFilterFpChance() == 1.0)
 +        {
 +            // bf is disabled.
 +            load(false, true);
 +            bf = FilterFactory.AlwaysPresent;
 +        }
 +        else if (!components.contains(Component.FILTER) || validation == null)
 +        {
 +            // bf is enabled, but filter component is missing.
 +            load(true, true);
 +        }
 +        else if (validation.bloomFilterFPChance != metadata.getBloomFilterFpChance())
 +        {
 +            // bf fp chance in sstable metadata and it has changed since compaction.
 +            load(true, true);
 +        }
 +        else
 +        {
 +            // bf is enabled and fp chance matches the currently configured value.
 +            load(false, true);
 +            loadBloomFilter();
 +        }
 +    }
 +
 +    /**
 +     * Load bloom filter from Filter.db file.
 +     *
 +     * @throws IOException
 +     */
 +    private void loadBloomFilter() throws IOException
 +    {
 +        DataInputStream stream = null;
 +        try
 +        {
 +            stream = new DataInputStream(new BufferedInputStream(new FileInputStream(descriptor.filenameFor(Component.FILTER))));
 +            bf = FilterFactory.deserialize(stream, true);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(stream);
 +        }
 +    }
 +
 +    /**
 +     * Loads ifile, dfile and indexSummary, and optionally recreates the bloom filter.
 +     * @param saveSummaryIfCreated for bulk loading purposes, if the summary was absent and needed to be built, you can
 +     *                             avoid persisting it to disk by setting this to false
 +     */
 +    private void load(boolean recreateBloomFilter, boolean saveSummaryIfCreated) throws IOException
 +    {
 +        SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
 +        SegmentedFile.Builder dbuilder = compression
 +                ? SegmentedFile.getCompressedBuilder()
 +                : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 +
 +        boolean summaryLoaded = loadSummary(ibuilder, dbuilder);
 +        if (recreateBloomFilter || !summaryLoaded)
 +            buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL);
 +
 +        ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
 +        dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
 +        if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) // save summary information to disk
 +            saveSummary(ibuilder, dbuilder);
 +    }
 +
 +    /**
 +     * Build index summary(and optionally bloom filter) by reading through Index.db file.
 +     *
 +     * @param recreateBloomFilter true if recreate bloom filter
 +     * @param ibuilder
 +     * @param dbuilder
 +     * @param summaryLoaded true if index summary is already loaded and not need to build again
 +     * @throws IOException
 +     */
 +    private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded, int samplingLevel) throws IOException
 +    {
 +        // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
 +        RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 +
 +        try
 +        {
 +            long indexSize = primaryIndex.length();
 +            long histogramCount = sstableMetadata.estimatedRowSize.count();
 +            long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed()
 +                    ? histogramCount
 +                    : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional
 +
 +            if (recreateBloomFilter)
 +                bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(), true);
 +
 +            IndexSummaryBuilder summaryBuilder = null;
 +            if (!summaryLoaded)
 +                summaryBuilder = new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel);
 +
 +            long indexPosition;
 +            RowIndexEntry.IndexSerializer rowIndexSerializer = descriptor.getFormat().getIndexSerializer(metadata);
 +
 +            while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
 +            {
 +                ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
 +                RowIndexEntry indexEntry = rowIndexSerializer.deserialize(primaryIndex, descriptor.version);
 +                DecoratedKey decoratedKey = partitioner.decorateKey(key);
 +                if (first == null)
 +                    first = decoratedKey;
 +                last = decoratedKey;
 +
 +                if (recreateBloomFilter)
 +                    bf.add(decoratedKey.getKey());
 +
 +                // if summary was already read from disk we don't want to re-populate it using primary index
 +                if (!summaryLoaded)
 +                {
 +                    summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
 +                    ibuilder.addPotentialBoundary(indexPosition);
 +                    dbuilder.addPotentialBoundary(indexEntry.position);
 +                }
 +            }
 +
 +            if (!summaryLoaded)
 +                indexSummary = summaryBuilder.build(partitioner);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(primaryIndex);
 +        }
 +
 +        first = getMinimalKey(first);
 +        last = getMinimalKey(last);
 +    }
 +
 +    /**
 +     * Load index summary from Summary.db file if it exists.
 +     *
 +     * if loaded index summary has different index interval from current value stored in schema,
 +     * then Summary.db file will be deleted and this returns false to rebuild summary.
 +     *
 +     * @param ibuilder
 +     * @param dbuilder
 +     * @return true if index summary is loaded successfully from Summary.db file.
 +     */
 +    public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
 +    {
 +        File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
 +        if (!summariesFile.exists())
 +            return false;
 +
 +        DataInputStream iStream = null;
 +        try
 +        {
 +            iStream = new DataInputStream(new FileInputStream(summariesFile));
 +            indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner, descriptor.version.hasSamplingLevel(), metadata.getMinIndexInterval(), metadata.getMaxIndexInterval());
 +            first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
 +            last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
 +            ibuilder.deserializeBounds(iStream);
 +            dbuilder.deserializeBounds(iStream);
 +        }
 +        catch (IOException e)
 +        {
 +            logger.debug("Cannot deserialize SSTable Summary File {}: {}", summariesFile.getPath(), e.getMessage());
 +            // corrupted; delete it and fall back to creating a new summary
 +            FileUtils.closeQuietly(iStream);
 +            // delete it and fall back to creating a new summary
 +            FileUtils.deleteWithConfirm(summariesFile);
 +            return false;
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(iStream);
 +        }
 +
 +        return true;
 +    }
 +
 +    /**
 +     * Save index summary to Summary.db file.
 +     *
 +     * @param ibuilder
 +     * @param dbuilder
 +     */
 +    public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
 +    {
 +        saveSummary(ibuilder, dbuilder, indexSummary);
 +    }
 +
 +    private void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary summary)
 +    {
 +        File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
 +        if (summariesFile.exists())
 +            FileUtils.deleteWithConfirm(summariesFile);
 +
 +        DataOutputStreamAndChannel oStream = null;
 +        try
 +        {
 +            oStream = new DataOutputStreamAndChannel(new FileOutputStream(summariesFile));
 +            IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel());
 +            ByteBufferUtil.writeWithLength(first.getKey(), oStream);
 +            ByteBufferUtil.writeWithLength(last.getKey(), oStream);
 +            ibuilder.serializeBounds(oStream);
 +            dbuilder.serializeBounds(oStream);
 +        }
 +        catch (IOException e)
 +        {
 +            logger.debug("Cannot save SSTable Summary: ", e);
 +
 +            // corrupted hence delete it and let it load it now.
 +            if (summariesFile.exists())
 +                FileUtils.deleteWithConfirm(summariesFile);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(oStream);
 +        }
 +    }
 +
 +    public void setReplacedBy(SSTableReader replacement)
 +    {
 +        synchronized (replaceLock)
 +        {
 +            assert replacedBy == null;
 +            replacedBy = replacement;
 +            replacement.replaces = this;
 +            replacement.replaceLock = replaceLock;
 +        }
 +    }
 +
 +    public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
 +    {
 +        synchronized (replaceLock)
 +        {
 +            assert replacedBy == null;
 +
 +            if (newStart.compareTo(this.first) > 0)
 +            {
 +                if (newStart.compareTo(this.last) > 0)
 +                {
 +                    this.runOnClose = new Runnable()
 +                    {
 +                        public void run()
 +                        {
 +                            CLibrary.trySkipCache(dfile.path, 0, 0);
 +                            CLibrary.trySkipCache(ifile.path, 0, 0);
 +                            runOnClose.run();
 +                        }
 +                    };
 +                }
 +                else
 +                {
 +                    final long dataStart = getPosition(newStart, Operator.GE).position;
 +                    final long indexStart = getIndexScanPosition(newStart);
 +                    this.runOnClose = new Runnable()
 +                    {
 +                        public void run()
 +                        {
 +                            CLibrary.trySkipCache(dfile.path, 0, dataStart);
 +                            CLibrary.trySkipCache(ifile.path, 0, indexStart);
 +                            runOnClose.run();
 +                        }
 +                    };
 +                }
 +            }
 +
 +            SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile, dfile, indexSummary.readOnlyClone(), bf, maxDataAge, sstableMetadata,
 +                    openReason == OpenReason.EARLY ? openReason : OpenReason.METADATA_CHANGE);
 +            replacement.readMeterSyncFuture = this.readMeterSyncFuture;
 +            replacement.readMeter = this.readMeter;
 +            replacement.first = this.last.compareTo(newStart) > 0 ? newStart : this.last;
 +            replacement.last = this.last;
 +            setReplacedBy(replacement);
 +            return replacement;
 +        }
 +    }
 +
 +    /**
 +     * Returns a new SSTableReader with the same properties as this SSTableReader except that a new IndexSummary will
 +     * be built at the target samplingLevel.  This (original) SSTableReader instance will be marked as replaced, have
 +     * its DeletingTask removed, and have its periodic read-meter sync task cancelled.
 +     * @param samplingLevel the desired sampling level for the index summary on the new SSTableReader
 +     * @return a new SSTableReader
 +     * @throws IOException
 +     */
 +    public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException
 +    {
 +        synchronized (replaceLock)
 +        {
 +            assert replacedBy == null;
 +
 +            int minIndexInterval = metadata.getMinIndexInterval();
 +            int maxIndexInterval = metadata.getMaxIndexInterval();
 +            double effectiveInterval = indexSummary.getEffectiveIndexInterval();
 +
 +            IndexSummary newSummary;
 +            long oldSize = bytesOnDisk();
 +
 +            // We have to rebuild the summary from the on-disk primary index in three cases:
 +            // 1. The sampling level went up, so we need to read more entries off disk
 +            // 2. The min_index_interval changed (in either direction); this changes what entries would be in the summary
 +            //    at full sampling (and consequently at any other sampling level)
 +            // 3. The max_index_interval was lowered, forcing us to raise the sampling level
 +            if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval() != minIndexInterval || effectiveInterval > maxIndexInterval)
 +            {
 +                newSummary = buildSummaryAtLevel(samplingLevel);
 +            }
 +            else if (samplingLevel < indexSummary.getSamplingLevel())
 +            {
 +                // we can use the existing index summary to make a smaller one
 +                newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner);
 +
 +                SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
 +                SegmentedFile.Builder dbuilder = compression
 +                        ? SegmentedFile.getCompressedBuilder()
 +                        : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 +                saveSummary(ibuilder, dbuilder, newSummary);
 +            }
 +            else
 +            {
 +                throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level and " +
 +                        "no adjustments to min/max_index_interval");
 +            }
 +
 +            long newSize = bytesOnDisk();
 +            StorageMetrics.load.inc(newSize - oldSize);
 +            parent.metric.liveDiskSpaceUsed.inc(newSize - oldSize);
 +
 +            SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile, dfile, newSummary, bf, maxDataAge, sstableMetadata,
 +                    openReason == OpenReason.EARLY ? openReason : OpenReason.METADATA_CHANGE);
 +            replacement.readMeterSyncFuture = this.readMeterSyncFuture;
 +            replacement.readMeter = this.readMeter;
 +            replacement.first = this.first;
 +            replacement.last = this.last;
 +            setReplacedBy(replacement);
 +            return replacement;
 +        }
 +    }
 +
 +    private IndexSummary buildSummaryAtLevel(int newSamplingLevel) throws IOException
 +    {
 +        // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
 +        RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 +        try
 +        {
 +            long indexSize = primaryIndex.length();
 +            IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(), metadata.getMinIndexInterval(), newSamplingLevel);
 +
 +            long indexPosition;
 +            while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
 +            {
 +                summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
 +                RowIndexEntry.Serializer.skip(primaryIndex);
 +            }
 +
 +            return summaryBuilder.build(partitioner);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(primaryIndex);
 +        }
 +    }
 +
 +    public int getIndexSummarySamplingLevel()
 +    {
 +        return indexSummary.getSamplingLevel();
 +    }
 +
 +    public long getIndexSummaryOffHeapSize()
 +    {
 +        return indexSummary.getOffHeapSize();
 +    }
 +
 +    public int getMinIndexInterval()
 +    {
 +        return indexSummary.getMinIndexInterval();
 +    }
 +
 +    public double getEffectiveIndexInterval()
 +    {
 +        return indexSummary.getEffectiveIndexInterval();
 +    }
 +
 +    public void releaseSummary() throws IOException
 +    {
 +        indexSummary.close();
 +        indexSummary = null;
 +    }
 +
 +    private void validate()
 +    {
 +        if (this.first.compareTo(this.last) > 0)
 +            throw new IllegalStateException(String.format("SSTable first key %s > last key %s", this.first, this.last));
 +    }
 +
 +    /**
 +     * Gets the position in the index file to start scanning to find the given key (at most indexInterval keys away,
 +     * modulo downsampling of the index summary).
 +     */
 +    public long getIndexScanPosition(RowPosition key)
 +    {
 +        return getIndexScanPositionFromBinarySearchResult(indexSummary.binarySearch(key), indexSummary);
 +    }
 +
 +    protected static long getIndexScanPositionFromBinarySearchResult(int binarySearchResult, IndexSummary referencedIndexSummary)
 +    {
 +        if (binarySearchResult == -1)
 +            return -1;
 +        else
 +            return referencedIndexSummary.getPosition(getIndexSummaryIndexFromBinarySearchResult(binarySearchResult));
 +    }
 +
 +    protected static int getIndexSummaryIndexFromBinarySearchResult(int binarySearchResult)
 +    {
 +        if (binarySearchResult < 0)
 +        {
 +            // binary search gives us the first index _greater_ than the key searched for,
 +            // i.e., its insertion position
 +            int greaterThan = (binarySearchResult + 1) * -1;
 +            if (greaterThan == 0)
 +                return -1;
 +            return greaterThan - 1;
 +        }
 +        else
 +        {
 +            return binarySearchResult;
 +        }
 +    }
 +
 +    /**
 +     * Returns the compression metadata for this sstable.
 +     * @throws IllegalStateException if the sstable is not compressed
 +     */
 +    public CompressionMetadata getCompressionMetadata()
 +    {
 +        if (!compression)
 +            throw new IllegalStateException(this + " is not compressed");
 +
 +        CompressionMetadata cmd = ((ICompressedFile) dfile).getMetadata();
 +
 +        //We need the parent cf metadata
 +        String cfName = metadata.isSecondaryIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName;
 +        cmd.parameters.setLiveMetadata(Schema.instance.getCFMetaData(metadata.ksName, cfName));
 +
 +        return cmd;
 +    }
 +
 +    /**
 +     * For testing purposes only.
 +     */
 +    public void forceFilterFailures()
 +    {
 +        bf = FilterFactory.AlwaysPresent;
 +    }
 +
 +    public IFilter getBloomFilter()
 +    {
 +        return bf;
 +    }
 +
 +    public long getBloomFilterSerializedSize()
 +    {
 +        return bf.serializedSize();
 +    }
 +
 +    /**
 +     * @return An estimate of the number of keys in this SSTable based on the index summary.
 +     */
 +    public long estimatedKeys()
 +    {
 +        return indexSummary.getEstimatedKeyCount();
 +    }
 +
 +    /**
 +     * @param ranges
 +     * @return An estimate of the number of keys for given ranges in this SSTable.
 +     */
 +    public long estimatedKeysForRanges(Collection<Range<Token>> ranges)
 +    {
 +        long sampleKeyCount = 0;
 +        List<Pair<Integer, Integer>> sampleIndexes = getSampleIndexesForRanges(indexSummary, ranges);
 +        for (Pair<Integer, Integer> sampleIndexRange : sampleIndexes)
 +            sampleKeyCount += (sampleIndexRange.right - sampleIndexRange.left + 1);
 +
 +        // adjust for the current sampling level: (BSL / SL) * index_interval_at_full_sampling
 +        long estimatedKeys = sampleKeyCount * (Downsampling.BASE_SAMPLING_LEVEL * indexSummary.getMinIndexInterval()) / indexSummary.getSamplingLevel();
 +        return Math.max(1, estimatedKeys);
 +    }
 +
 +    /**
 +     * Returns the number of entries in the IndexSummary.  At full sampling, this is approximately 1/INDEX_INTERVALth of
 +     * the keys in this SSTable.
 +     */
 +    public int getIndexSummarySize()
 +    {
 +        return indexSummary.size();
 +    }
 +
 +    /**
 +     * Returns the approximate number of entries the IndexSummary would contain if it were at full sampling.
 +     */
 +    public int getMaxIndexSummarySize()
 +    {
 +        return indexSummary.getMaxNumberOfEntries();
 +    }
 +
 +    /**
 +     * Returns the key for the index summary entry at `index`.
 +     */
 +    public byte[] getIndexSummaryKey(int index)
 +    {
 +        return indexSummary.getKey(index);
 +    }
 +
 +    private static List<Pair<Integer,Integer>> getSampleIndexesForRanges(IndexSummary summary, Collection<Range<Token>> ranges)
 +    {
 +        // use the index to determine a minimal section for each range
 +        List<Pair<Integer,Integer>> positions = new ArrayList<>();
 +
 +        for (Range<Token> range : Range.normalize(ranges))
 +        {
 +            RowPosition leftPosition = range.left.maxKeyBound();
 +            RowPosition rightPosition = range.right.maxKeyBound();
 +
 +            int left = summary.binarySearch(leftPosition);
 +            if (left < 0)
 +                left = (left + 1) * -1;
 +            else
 +                // left range are start exclusive
 +                left = left + 1;
 +            if (left == summary.size())
 +                // left is past the end of the sampling
 +                continue;
 +
 +            int right = Range.isWrapAround(range.left, range.right)
 +                    ? summary.size() - 1
 +                    : summary.binarySearch(rightPosition);
 +            if (right < 0)
 +            {
 +                // range are end inclusive so we use the previous index from what binarySearch give us
 +                // since that will be the last index we will return
 +                right = (right + 1) * -1;
 +                if (right == 0)
 +                    // Means the first key is already stricly greater that the right bound
 +                    continue;
 +                right--;
 +            }
 +
 +            if (left > right)
 +                // empty range
 +                continue;
 +            positions.add(Pair.create(left, right));
 +        }
 +        return positions;
 +    }
 +
 +    public Iterable<DecoratedKey> getKeySamples(final Range<Token> range)
 +    {
 +        final List<Pair<Integer, Integer>> indexRanges = getSampleIndexesForRanges(indexSummary, Collections.singletonList(range));
 +
 +        if (indexRanges.isEmpty())
 +            return Collections.emptyList();
 +
 +        return new Iterable<DecoratedKey>()
 +        {
 +            public Iterator<DecoratedKey> iterator()
 +            {
 +                return new Iterator<DecoratedKey>()
 +                {
 +                    private Iterator<Pair<Integer, Integer>> rangeIter = indexRanges.iterator();
 +                    private Pair<Integer, Integer> current;
 +                    private int idx;
 +
 +                    public boolean hasNext()
 +                    {
 +                        if (current == null || idx > current.right)
 +                        {
 +                            if (rangeIter.hasNext())
 +                            {
 +                                current = rangeIter.next();
 +                                idx = current.left;
 +                                return true;
 +                            }
 +                            return false;
 +                        }
 +
 +                        return true;
 +                    }
 +
 +                    public DecoratedKey next()
 +                    {
 +                        byte[] bytes = indexSummary.getKey(idx++);
 +                        return partitioner.decorateKey(ByteBuffer.wrap(bytes));
 +                    }
 +
 +                    public void remove()
 +                    {
 +                        throw new UnsupportedOperationException();
 +                    }
 +                };
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Determine the minimal set of sections that can be extracted from this SSTable to cover the given ranges.
 +     * @return A sorted list of (offset,end) pairs that cover the given ranges in the datafile for this SSTable.
 +     */
 +    public List<Pair<Long,Long>> getPositionsForRanges(Collection<Range<Token>> ranges)
 +    {
 +        // use the index to determine a minimal section for each range
 +        List<Pair<Long,Long>> positions = new ArrayList<>();
 +        for (Range<Token> range : Range.normalize(ranges))
 +        {
 +            AbstractBounds<RowPosition> keyRange = range.toRowBounds();
 +            RowIndexEntry idxLeft = getPosition(keyRange.left, Operator.GT);
 +            long left = idxLeft == null ? -1 : idxLeft.position;
 +            if (left == -1)
 +                // left is past the end of the file
 +                continue;
 +            RowIndexEntry idxRight = getPosition(keyRange.right, Operator.GT);
 +            long right = idxRight == null ? -1 : idxRight.position;
 +            if (right == -1 || Range.isWrapAround(range.left, range.right))
 +                // right is past the end of the file, or it wraps
 +                right = uncompressedLength();
 +            if (left == right)
 +                // empty range
 +                continue;
 +            positions.add(Pair.create(left, right));
 +        }
 +        return positions;
 +    }
 +
 +    public void invalidateCacheKey(DecoratedKey key)
 +    {
 +        KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
 +        keyCache.remove(cacheKey);
 +    }
 +
 +    public void cacheKey(DecoratedKey key, RowIndexEntry info)
 +    {
 +        CachingOptions caching = metadata.getCaching();
 +
 +        if (!caching.keyCache.isEnabled()
 +                || keyCache == null
 +                || keyCache.getCapacity() == 0)
 +        {
 +            return;
 +        }
 +
 +        KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
 +        logger.trace("Adding cache entry for {} -> {}", cacheKey, info);
 +        keyCache.put(cacheKey, info);
 +    }
 +
 +    public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats)
 +    {
 +        return getCachedPosition(new KeyCacheKey(metadata.cfId, descriptor, key.getKey()), updateStats);
 +    }
 +
 +    protected RowIndexEntry getCachedPosition(KeyCacheKey unifiedKey, boolean updateStats)
 +    {
 +        if (keyCache != null && keyCache.getCapacity() > 0) {
 +            if (updateStats)
 +            {
 +                RowIndexEntry cachedEntry = keyCache.get(unifiedKey);
 +                keyCacheRequest.incrementAndGet();
 +                if (cachedEntry != null)
 +                    keyCacheHit.incrementAndGet();
 +                return cachedEntry;
 +            }
 +            else
 +            {
 +                return keyCache.getInternal(unifiedKey);
 +            }
 +        }
 +        return null;
 +    }
 +
 +    /**
 +     * Get position updating key cache and stats.
 +     * @see #getPosition(org.apache.cassandra.db.RowPosition, SSTableReader.Operator, boolean)
 +     */
 +    public RowIndexEntry getPosition(RowPosition key, Operator op)
 +    {
 +        return getPosition(key, op, true);
 +    }
 +
 +    /**
 +     * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
 +     * allow key selection by token bounds but only if op != * EQ
 +     * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
 +     * @param updateCacheAndStats true if updating stats and cache
 +     * @return The index entry corresponding to the key, or null if the key is not present
 +     */
 +    public abstract RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats);
 +
 +    //Corresponds to a name column
 +    public abstract OnDiskAtomIterator iterator(DecoratedKey key, SortedSet<CellName> columns);
 +    public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry);
 +
 +    //Corresponds to a slice query
 +    public abstract OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, boolean reverse);
 +    public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, ColumnSlice[] slices, boolean reversed, RowIndexEntry indexEntry);
 +
 +    /**
 +     * Finds and returns the first key beyond a given token in this SSTable or null if no such key exists.
 +     */
 +    public DecoratedKey firstKeyBeyond(RowPosition token)
 +    {
 +        long sampledPosition = getIndexScanPosition(token);
 +        if (sampledPosition == -1)
 +            sampledPosition = 0;
 +
 +        Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
 +        while (segments.hasNext())
 +        {
 +            FileDataInput in = segments.next();
 +            try
 +            {
 +                while (!in.isEOF())
 +                {
 +                    ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
 +                    DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey);
 +                    if (indexDecoratedKey.compareTo(token) > 0)
 +                        return indexDecoratedKey;
 +
 +                    RowIndexEntry.Serializer.skip(in);
 +                }
 +            }
 +            catch (IOException e)
 +            {
 +                markSuspect();
 +                throw new CorruptSSTableException(e, in.getPath());
 +            }
 +            finally
 +            {
 +                FileUtils.closeQuietly(in);
 +            }
 +        }
 +
 +        return null;
 +    }
 +
 +    /**
 +     * @return The length in bytes of the data for this SSTable. For
 +     * compressed files, this is not the same thing as the on disk size (see
 +     * onDiskLength())
 +     */
 +    public long uncompressedLength()
 +    {
 +        return dfile.length;
 +    }
 +
 +    /**
 +     * @return The length in bytes of the on disk size for this SSTable. For
 +     * compressed files, this is not the same thing as the data length (see
 +     * length())
 +     */
 +    public long onDiskLength()
 +    {
 +        return dfile.onDiskLength;
 +    }
 +
 +    public boolean acquireReference()
 +    {
 +        while (true)
 +        {
 +            int n = references.get();
 +            if (n <= 0)
 +                return false;
 +            if (references.compareAndSet(n, n + 1))
 +                return true;
 +        }
 +    }
 +
++    @VisibleForTesting
++    public int referenceCount()
++    {
++        return references.get();
++    }
++
 +    /**
 +     * Release reference to this SSTableReader.
 +     * If there is no one referring to this SSTable, and is marked as compacted,
 +     * all resources are cleaned up and files are deleted eventually.
 +     */
 +    public void releaseReference()
 +    {
 +        if (references.decrementAndGet() == 0)
 +            tidy(true);
 +        assert references.get() >= 0 : "Reference counter " +  references.get() + " for " + dfile.path;
 +    }
 +
 +    /**
 +     * Mark the sstable as obsolete, i.e., compacted into newer sstables.
 +     *
 +     * When calling this function, the caller must ensure that the SSTableReader is not referenced anywhere
 +     * except for threads holding a reference.
 +     *
 +     * @return true if the this is the first time the file was marked obsolete.  Calling this
 +     * multiple times is usually buggy (see exceptions in DataTracker.unmarkCompacting and removeOldSSTablesSize).
 +     */
 +    public boolean markObsolete()
 +    {
 +        if (logger.isDebugEnabled())
 +            logger.debug("Marking {} compacted", getFilename());
 +
 +        synchronized (replaceLock)
 +        {
 +            assert replacedBy == null;
 +        }
 +        return !isCompacted.getAndSet(true);
 +    }
 +
 +    public boolean isMarkedCompacted()
 +    {
 +        return isCompacted.get();
 +    }
 +
 +    public void markSuspect()
 +    {
 +        if (logger.isDebugEnabled())
 +            logger.debug("Marking {} as a suspect for blacklisting.", getFilename());
 +
 +        isSuspect.getAndSet(true);
 +    }
 +
 +    public boolean isMarkedSuspect()
 +    {
 +        return isSuspect.get();
 +    }
 +
 +
 +    /**
 +     * I/O SSTableScanner
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ICompactionScanner getScanner()
 +    {
 +        return getScanner((RateLimiter) null);
 +    }
 +
 +    public ICompactionScanner getScanner(RateLimiter limiter)
 +    {
 +        return getScanner(DataRange.allData(partitioner), limiter);
 +    }
 +
 +    /**
 +     *
 +     * @param dataRange filter to use when reading the columns
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ICompactionScanner getScanner(DataRange dataRange)
 +    {
 +        return getScanner(dataRange, null);
 +    }
 +
 +    /**
 +     * Direct I/O SSTableScanner over a defined range of tokens.
 +     *
 +     * @param range the range of keys to cover
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ICompactionScanner getScanner(Range<Token> range, RateLimiter limiter)
 +    {
 +        if (range == null)
 +            return getScanner(limiter);
 +        return getScanner(Collections.singletonList(range), limiter);
 +    }
 +
 +    /**
 +     * Direct I/O SSTableScanner over a defined collection of ranges of tokens.
 +     *
 +     * @param ranges the range of keys to cover
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public abstract ICompactionScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter);
 +
 +    /**
 +     *
 +     * @param dataRange filter to use when reading the columns
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public abstract ICompactionScanner getScanner(DataRange dataRange, RateLimiter limiter);
 +
 +
 +
 +    public FileDataInput getFileDataInput(long position)
 +    {
 +        return dfile.getSegment(position);
 +    }
 +
 +    /**
 +     * Tests if the sstable contains data newer than the given age param (in localhost currentMilli time).
 +     * This works in conjunction with maxDataAge which is an upper bound on the create of data in this sstable.
 +     * @param age The age to compare the maxDataAre of this sstable. Measured in millisec since epoc on this host
 +     * @return True iff this sstable contains data that's newer than the given age parameter.
 +     */
 +    public boolean newSince(long age)
 +    {
 +        return maxDataAge > age;
 +    }
 +
 +    public void createLinks(String snapshotDirectoryPath)
 +    {
 +        for (Component component : components)
 +        {
 +            File sourceFile = new File(descriptor.filenameFor(component));
 +            File targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
 +            FileUtils.createHardLink(sourceFile, targetLink);
 +        }
 +    }
 +
 +    public boolean isRepaired()
 +    {
 +        return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
 +    }
 +
 +    public SSTableReader getCurrentReplacement()
 +    {
 +        synchronized (replaceLock)
 +        {
 +            SSTableReader cur = this, next = replacedBy;
 +            while (next != null)
 +            {
 +                cur = next;
 +                next = next.replacedBy;
 +            }
 +            return cur;
 +        }
 +    }
 +
 +    /**
 +     * TODO: Move someplace reusable
 +     */
 +    public abstract static class Operator
 +    {
 +        public static final Operator EQ = new Equals();
 +        public static final Operator GE = new GreaterThanOrEqualTo();
 +        public static final Operator GT = new GreaterThan();
 +
 +        /**
 +         * @param comparison The result of a call to compare/compareTo, with the desired field on the rhs.
 +         * @return less than 0 if the operator cannot match forward, 0 if it matches, greater than 0 if it might match forward.
 +         */
 +        public abstract int apply(int comparison);
 +
 +        final static class Equals extends Operator
 +        {
 +            public int apply(int comparison) { return -comparison; }
 +        }
 +
 +        final static class GreaterThanOrEqualTo extends Operator
 +        {
 +            public int apply(int comparison) { return comparison >= 0 ? 0 : -comparison; }
 +        }
 +
 +        final static class GreaterThan extends Operator
 +        {
 +            public int apply(int comparison) { return comparison > 0 ? 0 : 1; }
 +        }
 +    }
 +
 +    public long getBloomFilterFalsePositiveCount()
 +    {
 +        return bloomFilterTracker.getFalsePositiveCount();
 +    }
 +
 +    public long getRecentBloomFilterFalsePositiveCount()
 +    {
 +        return bloomFilterTracker.getRecentFalsePositiveCount();
 +    }
 +
 +    public long getBloomFilterTruePositiveCount()
 +    {
 +        return bloomFilterTracker.getTruePositiveCount();
 +    }
 +
 +    public long getRecentBloomFilterTruePositiveCount()
 +    {
 +        return bloomFilterTracker.getRecentTruePositiveCount();
 +    }
 +
 +    public InstrumentingCache<KeyCacheKey, RowIndexEntry> getKeyCache()
 +    {
 +        return keyCache;
 +    }
 +
 +    public EstimatedHistogram getEstimatedRowSize()
 +    {
 +        return sstableMetadata.estimatedRowSize;
 +    }
 +
 +    public EstimatedHistogram getEstimatedColumnCount()
 +    {
 +        return sstableMetadata.estimatedColumnCount;
 +    }
 +
 +    public double getEstimatedDroppableTombstoneRatio(int gcBefore)
 +    {
 +        return sstableMetadata.getEstimatedDroppableTombstoneRatio(gcBefore);
 +    }
 +
 +    public double getDroppableTombstonesBefore(int gcBefore)
 +    {
 +        return sstableMetadata.getDroppableTombstonesBefore(gcBefore);
 +    }
 +
 +    public double getCompressionRatio()
 +    {
 +        return sstableMetadata.compressionRatio;
 +    }
 +
 +    public ReplayPosition getReplayPosition()
 +    {
 +        return sstableMetadata.replayPosition;
 +    }
 +
 +    public long getMinTimestamp()
 +    {
 +        return sstableMetadata.minTimestamp;
 +    }
 +
 +    public long getMaxTimestamp()
 +    {
 +        return sstableMetadata.maxTimestamp;
 +    }
 +
 +    public Set<Integer> getAncestors()
 +    {
 +        try
 +        {
 +            CompactionMetadata compactionMetadata = (CompactionMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.COMPACTION);
 +            return compactionMetadata.ancestors;
 +        }
 +        catch (IOException e)
 +        {
 +            SSTableReader.logOpenException(descriptor, e);
 +            return Collections.emptySet();
 +        }
 +    }
 +
 +    public int getSSTableLevel()
 +    {
 +        return sstableMetadata.sstableLevel;
 +    }
 +
 +    /**
 +     * Reloads the sstable metadata from disk.
 +     *
 +     * Called after level is changed on sstable, for example if the sstable is dropped to L0
 +     *
 +     * Might be possible to remove in future versions
 +     *
 +     * @throws IOException
 +     */
 +    public void reloadSSTableMetadata() throws IOException
 +    {
 +        this.sstableMetadata = (StatsMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.STATS);
 +    }
 +
 +    public StatsMetadata getSSTableMetadata()
 +    {
 +        return sstableMetadata;
 +    }
 +
 +    public RandomAccessReader openDataReader(RateLimiter limiter)
 +    {
 +        assert limiter != null;
 +        return compression
 +                ? CompressedThrottledReader.open(getFilename(), getCompressionMetadata(), limiter)
 +                : ThrottledReader.open(new File(getFilename()), limiter);
 +    }
 +
 +    public RandomAccessReader openDataReader()
 +    {
 +        return compression
 +                ? CompressedRandomAccessReader.open(getFilename(), getCompressionMetadata())
 +                : RandomAccessReader.open(new File(getFilename()));
 +    }
 +
 +    public RandomAccessReader openIndexReader()
 +    {
 +        return RandomAccessReader.open(new File(getIndexFilename()));
 +    }
 +
 +    /**
 +     * @param component component to get timestamp.
 +     * @return last modified time for given component. 0 if given component does not exist or IO error occurs.
 +     */
 +    public long getCreationTimeFor(Component component)
 +    {
 +        return new File(descriptor.filenameFor(component)).lastModified();
 +    }
 +
 +    /**
 +     * @return Number of key cache hit
 +     */
 +    public long getKeyCacheHit()
 +    {
 +        return keyCacheHit.get();
 +    }
 +
 +    /**
 +     * @return Number of key cache request
 +     */
 +    public long getKeyCacheRequest()
 +    {
 +        return keyCacheRequest.get();
 +    }
 +
 +    /**
 +     * @param sstables
 +     * @return true if all desired references were acquired.  Otherwise, it will unreference any partial acquisition, and return false.
 +     */
 +    public static boolean acquireReferences(Iterable<SSTableReader> sstables)
 +    {
 +        SSTableReader failed = null;
 +        for (SSTableReader sstable : sstables)
 +        {
 +            if (!sstable.acquireReference())
 +            {
 +                failed = sstable;
 +                break;
 +            }
 +        }
 +
 +        if (failed == null)
 +            return true;
 +
 +        for (SSTableReader sstable : sstables)
 +        {
 +            if (sstable == failed)
 +                break;
 +            sstable.releaseReference();
 +        }
 +        return false;
 +    }
 +
 +    public static void releaseReferences(Iterable<SSTableReader> sstables)
 +    {
 +        for (SSTableReader sstable : sstables)
 +        {
 +            sstable.releaseReference();
 +        }
 +    }
 +
 +    private void dropPageCache()
 +    {
 +        dropPageCache(dfile.path);
 +        dropPageCache(ifile.path);
 +    }
 +
 +    private void dropPageCache(String filePath)
 +    {
 +        RandomAccessFile file = null;
 +
 +        try
 +        {
 +            file = new RandomAccessFile(filePath, "r");
 +
 +            int fd = CLibrary.getfd(file.getFD());
 +
 +            if (fd > 0)
 +            {
 +                if (logger.isDebugEnabled())
 +                    logger.debug(String.format("Dropping page cache of file %s.", filePath));
 +
 +                CLibrary.trySkipCache(fd, 0, 0);
 +            }
 +        }
 +        catch (IOException e)
 +        {
 +            // we don't care if cache cleanup fails
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(file);
 +        }
 +    }
 +
 +    /**
 +     * Increment the total row read count and read rate for this SSTable.  This should not be incremented for range
 +     * slice queries, row cache hits, or non-query reads, like compaction.
 +     */
 +    public void incrementReadCount()
 +    {
 +        if (readMeter != null)
 +            readMeter.mark();
 +    }
 +
 +    protected class EmptyCompactionScanner implements ICompactionScanner
 +    {
 +        private final String filename;
 +
 +        public EmptyCompactionScanner(String filename)
 +        {
 +            this.filename = filename;
 +        }
 +
 +        public long getLengthInBytes()
 +        {
 +            return 0;
 +        }
 +
 +        public long getCurrentPosition()
 +        {
 +            return 0;
 +        }
 +
 +        public String getBackingFiles()
 +        {
 +            return filename;
 +        }
 +
 +        public boolean hasNext()
 +        {
 +            return false;
 +        }
 +
 +        public OnDiskAtomIterator next()
 +        {
 +            return null;
 +        }
 +
 +        public void close() throws IOException { }
 +
 +        public void remove() { }
 +    }
 +
 +    public static class SizeComparator implements Comparator<SSTableReader>
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
 +        }
 +    }
 +
 +    public static abstract class Factory
 +    {
 +        public abstract SSTableReader open(final Descriptor descriptor,
 +                                           Set<Component> components,
 +                                           CFMetaData metadata,
 +                                           IPartitioner partitioner,
 +                                           Long maxDataAge,
 +                                           StatsMetadata sstableMetadata,
 +                                           OpenReason openReason);
 +
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index e6e5d55,5ed4f4a..fffc310
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@@ -29,17 -30,9 +30,18 @@@ import java.util.Collection
  import java.util.List;
  import java.util.concurrent.ExecutionException;
  
 +import org.apache.cassandra.config.KSMetaData;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
++import org.apache.cassandra.io.sstable.format.SSTableWriter;
 +import org.apache.cassandra.locator.SimpleStrategy;
 +import org.junit.BeforeClass;
 +import org.junit.After;
 +import org.junit.Test;
 +
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
- import org.apache.cassandra.config.KSMetaData;
+ import org.apache.cassandra.db.ArrayBackedSortedColumns;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.DecoratedKey;
  import org.apache.cassandra.db.Keyspace;
@@@ -47,13 -40,14 +49,15 @@@ import org.apache.cassandra.db.Mutation
  import org.apache.cassandra.dht.BytesToken;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.exceptions.ConfigurationException;
  import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 -import org.apache.cassandra.io.sstable.SSTableReader;
 -import org.apache.cassandra.io.sstable.SSTableScanner;
 -import org.apache.cassandra.io.sstable.SSTableWriter;
 +import org.apache.cassandra.service.ActiveRepairService;
+ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+ import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.utils.ByteBufferUtil;
 +
  import org.junit.After;
 +import org.junit.BeforeClass;
  import org.junit.Test;
  
  import com.google.common.collect.Iterables;
@@@ -134,36 -94,65 +138,72 @@@ public class AntiCompactionTes
          assertEquals(repairedKeys, 4);
          assertEquals(nonRepairedKeys, 6);
      }
 +
+     @Test
+     public void antiCompactionSizeTest() throws ExecutionException, InterruptedException, IOException
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE1);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+         cfs.disableAutoCompaction();
+         SSTableReader s = writeFile(cfs, 1000);
+         cfs.addSSTable(s);
+         long origSize = s.bytesOnDisk();
+         System.out.println(cfs.metric.liveDiskSpaceUsed.count());
+         Range<Token> range = new Range<Token>(new BytesToken(ByteBufferUtil.bytes(0)), new BytesToken(ByteBufferUtil.bytes(500)));
+         Collection<SSTableReader> sstables = cfs.getSSTables();
+         SSTableReader.acquireReferences(sstables);
+         CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), sstables, 12345);
+         long sum = 0;
+         for (SSTableReader x : cfs.getSSTables())
+             sum += x.bytesOnDisk();
+         assertEquals(sum, cfs.metric.liveDiskSpaceUsed.count());
+         assertEquals(origSize, cfs.metric.liveDiskSpaceUsed.count(), 100000);
+ 
+     }
+ 
+     private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
+     {
+         ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+         for (int i = 0; i < count; i++)
+             cf.addColumn(Util.column(String.valueOf(i), "a", 1));
+         File dir = cfs.directories.getDirectoryForNewSSTables();
+         String filename = cfs.getTempSSTablePath(dir);
+ 
 -        SSTableWriter writer = new SSTableWriter(filename,
 -                0,
 -                0,
 -                cfs.metadata,
 -                StorageService.getPartitioner(),
 -                new MetadataCollector(cfs.metadata.comparator));
++        SSTableWriter writer = SSTableWriter.create(filename,0,0);
+ 
+         for (int i = 0; i < count * 5; i++)
+             writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+         return writer.closeAndOpenReader();
+     }
  
 -    @Test
 -    public void shouldSkipAntiCompactionForNonIntersectingRange() throws InterruptedException, ExecutionException, IOException
 +    public void generateSStable(ColumnFamilyStore store, String Suffix)
      {
 -        ColumnFamilyStore store = prepareColumnFamilyStore();
 -        Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
 -        assertEquals(store.getSSTables().size(), sstables.size());
 -        Range<Token> range = new Range<Token>(new BytesToken("-10".getBytes()), new BytesToken("-1".getBytes()));
 -        List<Range<Token>> ranges = Arrays.asList(range);
 +    long timestamp = System.currentTimeMillis();
 +    for (int i = 0; i < 10; i++)
 +        {
 +            DecoratedKey key = Util.dk(Integer.toString(i) + "-" + Suffix);
 +            Mutation rm = new Mutation(KEYSPACE1, key.getKey());
 +            for (int j = 0; j < 10; j++)
 +                rm.add("Standard1", Util.cellname(Integer.toString(j)),
 +                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
 +                        timestamp,
 +                        0);
 +            rm.apply();
 +        }
 +        store.forceBlockingFlush();
 +    }
  
 -        SSTableReader.acquireReferences(sstables);
 -        CompactionManager.instance.performAnticompaction(store, ranges, sstables, 0);
 +    @Test
 +    public void antiCompactTenSTC() throws InterruptedException, ExecutionException, IOException{
 +        antiCompactTen("SizeTieredCompactionStrategy");
 +    }
  
 -        assertThat(store.getSSTables().size(), is(1));
 -        assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false));
 +    @Test
 +    public void antiCompactTenLC() throws InterruptedException, ExecutionException, IOException{
 +        antiCompactTen("LeveledCompactionStrategy");
      }
  
 -    private ColumnFamilyStore prepareColumnFamilyStore()
 +    public void antiCompactTen(String compactionStrategy) throws InterruptedException, ExecutionException, IOException
      {
          Keyspace keyspace = Keyspace.open(KEYSPACE1);
          ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------


[2/5] git commit: Refactor how we track live size

Posted by ma...@apache.org.
Refactor how we track live size

Patch by marcuse; reviewed by yukim for CASSANDRA-7852


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5160c916
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5160c916
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5160c916

Branch: refs/heads/trunk
Commit: 5160c916c90886f69023ddba0078a624e5cf202d
Parents: 9c316e7
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Oct 17 14:15:46 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Nov 3 16:39:19 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/DataTracker.java    | 109 ++++++++++++-------
 .../db/compaction/CompactionManager.java        |  26 ++---
 .../cassandra/db/compaction/CompactionTask.java |   7 +-
 .../cassandra/db/compaction/Scrubber.java       |  12 +-
 .../cassandra/db/compaction/Upgrader.java       |  31 +++---
 .../io/sstable/IndexSummaryManager.java         |   2 +-
 .../cassandra/io/sstable/SSTableRewriter.java   |  90 ++++-----------
 .../db/compaction/AntiCompactionTest.java       |  48 +++++++-
 .../io/sstable/IndexSummaryManagerTest.java     |   2 +-
 .../cassandra/io/sstable/SSTableReaderTest.java |   2 +-
 .../io/sstable/SSTableRewriterTest.java         |  57 ++++++----
 12 files changed, 219 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 681d616..32083cc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.2
+ * Refactor how we track live size (CASSANDRA-7852)
  * Make sure unfinished compaction files are removed (CASSANDRA-8124)
  * Fix shutdown when run as Windows service (CASSANDRA-8136)
  * Fix DESCRIBE TABLE with custom indexes (CASSANDRA-8031)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 7393323..7df2b75 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -254,33 +254,36 @@ public class DataTracker
 
     public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType)
     {
-        replace(sstables, Collections.<SSTableReader>emptyList());
+        removeSSTablesFromTracker(sstables);
+        releaseReferences(sstables, false);
         notifySSTablesChanged(sstables, Collections.<SSTableReader>emptyList(), compactionType);
     }
 
+    /**
+     *
+     * @param oldSSTables
+     * @param allReplacements
+     * @param compactionType
+     */
     // note that this DOES NOT insert the replacement sstables, it only removes the old sstables and notifies any listeners
     // that they have been replaced by the provided sstables, which must have been performed by an earlier replaceReaders() call
-    public void markCompactedSSTablesReplaced(Collection<SSTableReader> sstables, Collection<SSTableReader> allReplacements, OperationType compactionType)
+    public void markCompactedSSTablesReplaced(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> allReplacements, OperationType compactionType)
     {
-        replace(sstables, Collections.<SSTableReader>emptyList());
-        notifySSTablesChanged(sstables, allReplacements, compactionType);
-        for (SSTableReader sstable : allReplacements)
-        {
-            long bytesOnDisk = sstable.bytesOnDisk();
-            cfstore.metric.totalDiskSpaceUsed.inc(bytesOnDisk);
-            cfstore.metric.liveDiskSpaceUsed.inc(bytesOnDisk);
-        }
+        removeSSTablesFromTracker(oldSSTables);
+        releaseReferences(oldSSTables, false);
+        notifySSTablesChanged(oldSSTables, allReplacements, compactionType);
+        addNewSSTablesSize(allReplacements);
     }
 
     public void addInitialSSTables(Collection<SSTableReader> sstables)
     {
-        replace(Collections.<SSTableReader>emptyList(), sstables);
+        addSSTablesToTracker(sstables);
         // no notifications or backup necessary
     }
 
     public void addSSTables(Collection<SSTableReader> sstables)
     {
-        replace(Collections.<SSTableReader>emptyList(), sstables);
+        addSSTablesToTracker(sstables);
         for (SSTableReader sstable : sstables)
         {
             maybeIncrementallyBackup(sstable);
@@ -289,6 +292,32 @@ public class DataTracker
     }
 
     /**
+     * Replaces existing sstables with new instances, makes sure compaction strategies have the correct instance
+     *
+     * @param toReplace
+     * @param replaceWith
+     */
+    public void replaceWithNewInstances(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith)
+    {
+        replaceReaders(toReplace, replaceWith, true);
+    }
+
+    /**
+     * Adds the early opened files to the data tracker, but does not tell compaction strategies about it
+     *
+     * note that we dont track the live size of these sstables
+     * @param toReplace
+     * @param replaceWith
+     */
+    public void replaceEarlyOpenedFiles(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith)
+    {
+        for (SSTableReader s : toReplace)
+            assert s.openReason.equals(SSTableReader.OpenReason.EARLY);
+        // note that we can replace an early opened file with a real one
+        replaceReaders(toReplace, replaceWith, false);
+    }
+
+    /**
      * removes all sstables that are not busy compacting.
      */
     public void unreferenceSSTables()
@@ -310,7 +339,8 @@ public class DataTracker
             return;
         }
         notifySSTablesChanged(notCompacting, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN);
-        postReplace(notCompacting, Collections.<SSTableReader>emptySet(), true);
+        removeOldSSTablesSize(notCompacting);
+        releaseReferences(notCompacting, true);
     }
 
     /**
@@ -344,11 +374,11 @@ public class DataTracker
     void init()
     {
         view.set(new View(
-                         ImmutableList.of(new Memtable(cfstore)),
-                         ImmutableList.<Memtable>of(),
-                         Collections.<SSTableReader>emptySet(),
-                         Collections.<SSTableReader>emptySet(),
-                         SSTableIntervalTree.empty()));
+                ImmutableList.of(new Memtable(cfstore)),
+                ImmutableList.<Memtable>of(),
+                Collections.<SSTableReader>emptySet(),
+                Collections.<SSTableReader>emptySet(),
+                SSTableIntervalTree.empty()));
     }
 
     /**
@@ -358,7 +388,7 @@ public class DataTracker
      * @param oldSSTables replaced readers
      * @param newSSTables replacement readers
      */
-    public void replaceReaders(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> newSSTables, boolean notify)
+    private void replaceReaders(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> newSSTables, boolean notify)
     {
         View currentView, newView;
         do
@@ -369,7 +399,7 @@ public class DataTracker
         while (!view.compareAndSet(currentView, newView));
 
         if (!oldSSTables.isEmpty() && notify)
-            notifySSTablesChanged(oldSSTables, newSSTables, OperationType.COMPACTION);
+            notifySSTablesChanged(oldSSTables, newSSTables, OperationType.UNKNOWN);
 
         for (SSTableReader sstable : newSSTables)
             sstable.setTrackedBy(this);
@@ -378,29 +408,28 @@ public class DataTracker
             sstable.releaseReference();
     }
 
-    private void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
+    private void removeSSTablesFromTracker(Collection<SSTableReader> oldSSTables)
     {
-        if (!cfstore.isValid())
-        {
-            removeOldSSTablesSize(replacements, false);
-            replacements = Collections.emptyList();
-        }
-
         View currentView, newView;
         do
         {
             currentView = view.get();
-            newView = currentView.replace(oldSSTables, replacements);
+            newView = currentView.replace(oldSSTables, Collections.<SSTableReader>emptyList());
         }
         while (!view.compareAndSet(currentView, newView));
-
-        postReplace(oldSSTables, replacements, false);
+        removeOldSSTablesSize(oldSSTables);
     }
 
-    private void postReplace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements, boolean tolerateCompacted)
+    private void addSSTablesToTracker(Collection<SSTableReader> sstables)
     {
-        addNewSSTablesSize(replacements);
-        removeOldSSTablesSize(oldSSTables, tolerateCompacted);
+        View currentView, newView;
+        do
+        {
+            currentView = view.get();
+            newView = currentView.replace(Collections.<SSTableReader>emptyList(), sstables);
+        }
+        while (!view.compareAndSet(currentView, newView));
+        addNewSSTablesSize(sstables);
     }
 
     private void addNewSSTablesSize(Iterable<SSTableReader> newSSTables)
@@ -418,7 +447,7 @@ public class DataTracker
         }
     }
 
-    private void removeOldSSTablesSize(Iterable<SSTableReader> oldSSTables, boolean tolerateCompacted)
+    private void removeOldSSTablesSize(Iterable<SSTableReader> oldSSTables)
     {
         for (SSTableReader sstable : oldSSTables)
         {
@@ -428,13 +457,15 @@ public class DataTracker
             long size = sstable.bytesOnDisk();
             StorageMetrics.load.dec(size);
             cfstore.metric.liveDiskSpaceUsed.dec(size);
+        }
+    }
 
-            // tolerateCompacted will be true when the CFS is no longer valid (dropped). If there were ongoing
-            // compactions when it was invalidated, sstables may already be marked compacted, so we should
-            // tolerate that (see CASSANDRA-5957)
+    private void releaseReferences(Iterable<SSTableReader> oldSSTables, boolean tolerateCompacted)
+    {
+        for (SSTableReader sstable : oldSSTables)
+        {
             boolean firstToCompact = sstable.markObsolete();
-            assert (tolerateCompacted || firstToCompact) : sstable + " was already marked compacted";
-
+            assert tolerateCompacted || firstToCompact : sstable + " was already marked compacted";
             sstable.releaseReference();
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 51f45b8..84c3cb5 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -688,8 +688,9 @@ public class CompactionManager implements CompactionManagerMBean
         CleanupInfo ci = new CleanupInfo(sstable, scanner);
 
         metrics.beginCompaction(ci);
-        SSTableRewriter writer = new SSTableRewriter(cfs, new HashSet<>(ImmutableSet.of(sstable)), sstable.maxDataAge, OperationType.CLEANUP, false);
-
+        Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable);
+        SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, false);
+        List<SSTableReader> finished;
         try (CompactionController controller = new CompactionController(cfs, Collections.singleton(sstable), getDefaultGcBefore(cfs)))
         {
             writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
@@ -711,7 +712,8 @@ public class CompactionManager implements CompactionManagerMBean
             // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
             cfs.indexManager.flushIndexesBlocking();
 
-            writer.finish();
+            finished = writer.finish();
+            cfs.getDataTracker().markCompactedSSTablesReplaced(oldSSTable, finished, OperationType.CLEANUP);
         }
         catch (Throwable e)
         {
@@ -724,17 +726,16 @@ public class CompactionManager implements CompactionManagerMBean
             metrics.finishCompaction(ci);
         }
 
-        List<SSTableReader> results = writer.finished();
-        if (!results.isEmpty())
+        if (!finished.isEmpty())
         {
             String format = "Cleaned up to %s.  %,d to %,d (~%d%% of original) bytes for %,d keys.  Time: %,dms.";
             long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
             long startsize = sstable.onDiskLength();
             long endsize = 0;
-            for (SSTableReader newSstable : results)
+            for (SSTableReader newSstable : finished)
                 endsize += newSstable.onDiskLength();
             double ratio = (double) endsize / (double) startsize;
-            logger.info(String.format(format, results.get(0).getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
+            logger.info(String.format(format, finished.get(0).getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
         }
 
     }
@@ -994,8 +995,8 @@ public class CompactionManager implements CompactionManagerMBean
             sstableAsSet.add(sstable);
 
             File destination = cfs.directories.getDirectoryForNewSSTables();
-            SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
-            SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
+            SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
+            SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
 
             AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
             try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(new HashSet<>(Collections.singleton(sstable)));
@@ -1024,11 +1025,10 @@ public class CompactionManager implements CompactionManagerMBean
                 }
                 // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
                 // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
-                repairedSSTableWriter.finish(false, repairedAt);
-                unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE);
                 // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
-                anticompactedSSTables.addAll(repairedSSTableWriter.finished());
-                anticompactedSSTables.addAll(unRepairedSSTableWriter.finished());
+                anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
+                anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
+                cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION);
             }
             catch (Throwable e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index d2ae04a..b442482 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -152,7 +152,7 @@ public class CompactionTask extends AbstractCompactionTask
             {
                 AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller);
                 Iterator<AbstractCompactedRow> iter = ci.iterator();
-
+                List<SSTableReader> newSStables;
                 // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
                 // replace the old entries.  Track entries to preheat here until then.
                 long minRepairedAt = getMinRepairedAt(actuallyCompact);
@@ -161,7 +161,7 @@ public class CompactionTask extends AbstractCompactionTask
                 if (collector != null)
                     collector.beginCompaction(ci);
                 long lastCheckObsoletion = start;
-                SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline);
+                SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, offline);
                 try
                 {
                     if (!iter.hasNext())
@@ -197,7 +197,7 @@ public class CompactionTask extends AbstractCompactionTask
                     }
 
                     // don't replace old sstables yet, as we need to mark the compaction finished in the system table
-                    writer.finish(false);
+                    newSStables = writer.finish();
                 }
                 catch (Throwable t)
                 {
@@ -217,7 +217,6 @@ public class CompactionTask extends AbstractCompactionTask
                 }
 
                 Collection<SSTableReader> oldSStables = this.sstables;
-                List<SSTableReader> newSStables = writer.finished();
                 if (!offline)
                     cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index b3d098d..0cd71f2 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -22,6 +22,7 @@ import java.io.*;
 import java.util.*;
 
 import com.google.common.base.Throwables;
+import com.google.common.collect.Sets;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.sstable.*;
@@ -107,7 +108,8 @@ public class Scrubber implements Closeable
     public void scrub()
     {
         outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
-        SSTableRewriter writer = new SSTableRewriter(cfs, new HashSet<>(Collections.singleton(sstable)), sstable.maxDataAge, OperationType.SCRUB, isOffline);
+        Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable);
+        SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, isOffline);
         try
         {
             ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
@@ -256,9 +258,11 @@ public class Scrubber implements Closeable
             }
 
             // finish obsoletes the old sstable
-            writer.finish(!isOffline, badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt);
-            if (!writer.finished().isEmpty())
-                newSstable = writer.finished().get(0);
+            List<SSTableReader> finished = writer.finish(badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt);
+            if (!finished.isEmpty())
+                newSstable = finished.get(0);
+            if (!isOffline)
+                cfs.getDataTracker().markCompactedSSTablesReplaced(oldSSTable, finished, OperationType.SCRUB);
         }
         catch (Throwable t)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index f102fef..39f668d 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.*;
 
 import com.google.common.base.Throwables;
+import com.google.common.collect.Sets;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
@@ -35,7 +36,6 @@ public class Upgrader
 {
     private final ColumnFamilyStore cfs;
     private final SSTableReader sstable;
-    private final Set<SSTableReader> toUpgrade;
     private final File directory;
 
     private final OperationType compactionType = OperationType.UPGRADE_SSTABLES;
@@ -49,7 +49,6 @@ public class Upgrader
     {
         this.cfs = cfs;
         this.sstable = sstable;
-        this.toUpgrade = new HashSet<>(Collections.singleton(sstable));
         this.outputHandler = outputHandler;
 
         this.directory = new File(sstable.getFilename()).getParentFile();
@@ -57,8 +56,8 @@ public class Upgrader
         this.controller = new UpgradeController(cfs);
 
         this.strategy = cfs.getCompactionStrategy();
-        long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(toUpgrade));
-        long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(this.toUpgrade) / strategy.getMaxSSTableBytes());
+        long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(Arrays.asList(this.sstable)));
+        long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(Arrays.asList(this.sstable)) / strategy.getMaxSSTableBytes());
         this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
     }
 
@@ -68,27 +67,22 @@ public class Upgrader
 
         // Get the max timestamp of the precompacted sstables
         // and adds generation of live ancestors
-        // -- note that we always only have one SSTable in toUpgrade here:
-        for (SSTableReader sstable : toUpgrade)
+        sstableMetadataCollector.addAncestor(sstable.descriptor.generation);
+        for (Integer i : sstable.getAncestors())
         {
-            sstableMetadataCollector.addAncestor(sstable.descriptor.generation);
-            for (Integer i : sstable.getAncestors())
-            {
-                if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
-                    sstableMetadataCollector.addAncestor(i);
-            }
-            sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
+            if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
+                sstableMetadataCollector.addAncestor(i);
         }
-
+        sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
         return new SSTableWriter(cfs.getTempSSTablePath(directory), estimatedRows, repairedAt, cfs.metadata, cfs.partitioner, sstableMetadataCollector);
     }
 
     public void upgrade()
     {
         outputHandler.output("Upgrading " + sstable);
-
-        SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(this.toUpgrade), OperationType.UPGRADE_SSTABLES, true);
-        try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(this.toUpgrade))
+        Set<SSTableReader> toUpgrade = Sets.newHashSet(sstable);
+        SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(toUpgrade), true);
+        try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(toUpgrade))
         {
             Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller).iterator();
             writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
@@ -98,7 +92,8 @@ public class Upgrader
                 writer.append(row);
             }
 
-            writer.finish();
+            List<SSTableReader> sstables = writer.finish();
+            cfs.getDataTracker().markCompactedSSTablesReplaced(toUpgrade, sstables, OperationType.UPGRADE_SSTABLES);
             outputHandler.output("Upgrade of " + sstable + " complete.");
 
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index cc60b4d..65b25a4 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -416,7 +416,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
 
         for (DataTracker tracker : replacedByTracker.keySet())
         {
-            tracker.replaceReaders(replacedByTracker.get(tracker), replacementsByTracker.get(tracker), true);
+            tracker.replaceWithNewInstances(replacedByTracker.get(tracker), replacementsByTracker.get(tracker));
             newSSTables.addAll(replacementsByTracker.get(tracker));
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 2c9fe7e..4d5a06f 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.io.sstable;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -36,7 +35,6 @@ import org.apache.cassandra.db.DataTracker;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -67,8 +65,6 @@ public class SSTableRewriter
         preemptiveOpenInterval = interval;
     }
 
-    private boolean isFinished = false;
-
     @VisibleForTesting
     static void overrideOpenInterval(long size)
     {
@@ -86,16 +82,14 @@ public class SSTableRewriter
     private SSTableReader currentlyOpenedEarly; // the reader for the most recent (re)opening of the target file
     private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at
 
-    private final List<SSTableReader> finished = new ArrayList<>(); // the resultant sstables
     private final List<SSTableReader> finishedOpenedEarly = new ArrayList<>(); // the 'finished' tmplink sstables
     private final List<Pair<SSTableWriter, SSTableReader>> finishedWriters = new ArrayList<>();
-    private final OperationType rewriteType; // the type of rewrite/compaction being performed
     private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of DataTracker)
 
     private SSTableWriter writer;
     private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
 
-    public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, OperationType rewriteType, boolean isOffline)
+    public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline)
     {
         this.rewriting = rewriting;
         for (SSTableReader sstable : rewriting)
@@ -106,7 +100,6 @@ public class SSTableRewriter
         this.dataTracker = cfs.getDataTracker();
         this.cfs = cfs;
         this.maxAge = maxAge;
-        this.rewriteType = rewriteType;
         this.isOffline = isOffline;
     }
 
@@ -147,28 +140,18 @@ public class SSTableRewriter
     // attempts to append the row, if fails resets the writer position
     public RowIndexEntry tryAppend(AbstractCompactedRow row)
     {
-        mark();
+        writer.mark();
         try
         {
             return append(row);
         }
         catch (Throwable t)
         {
-            resetAndTruncate();
+            writer.resetAndTruncate();
             throw t;
         }
     }
 
-    private void mark()
-    {
-        writer.mark();
-    }
-
-    private void resetAndTruncate()
-    {
-        writer.resetAndTruncate();
-    }
-
     private void maybeReopenEarly(DecoratedKey key)
     {
         if (FBUtilities.isUnix() && writer.getFilePointer() - currentlyOpenedEarlyAt > preemptiveOpenInterval)
@@ -186,7 +169,7 @@ public class SSTableRewriter
                 SSTableReader reader = writer.openEarly(maxAge);
                 if (reader != null)
                 {
-                    replaceReader(currentlyOpenedEarly, reader, false);
+                    replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
                     currentlyOpenedEarly = reader;
                     currentlyOpenedEarlyAt = writer.getFilePointer();
                     moveStarts(reader, Functions.constant(reader.last), false);
@@ -222,7 +205,7 @@ public class SSTableRewriter
         // releases reference in replaceReaders
         if (!isOffline)
         {
-            dataTracker.replaceReaders(close, Collections.<SSTableReader>emptyList(), false);
+            dataTracker.replaceEarlyOpenedFiles(close, Collections.<SSTableReader>emptyList());
             dataTracker.unmarkCompacting(close);
         }
     }
@@ -276,12 +259,14 @@ public class SSTableRewriter
                 }));
             }
         }
-        replaceReaders(toReplace, replaceWith, true);
+        cfs.getDataTracker().replaceWithNewInstances(toReplace, replaceWith);
         rewriting.removeAll(toReplace);
         rewriting.addAll(replaceWith);
     }
 
-    private void replaceReader(SSTableReader toReplace, SSTableReader replaceWith, boolean notify)
+
+
+    private void replaceEarlyOpenedFile(SSTableReader toReplace, SSTableReader replaceWith)
     {
         if (isOffline)
             return;
@@ -296,14 +281,7 @@ public class SSTableRewriter
             dataTracker.markCompacting(Collections.singleton(replaceWith));
             toReplaceSet = Collections.emptySet();
         }
-        replaceReaders(toReplaceSet, Collections.singleton(replaceWith), notify);
-    }
-
-    private void replaceReaders(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith, boolean notify)
-    {
-        if (isOffline)
-            return;
-        dataTracker.replaceReaders(toReplace, replaceWith, notify);
+        dataTracker.replaceEarlyOpenedFiles(toReplaceSet, Collections.singleton(replaceWith));
     }
 
     public void switchWriter(SSTableWriter newWriter)
@@ -318,7 +296,7 @@ public class SSTableRewriter
         if (reader != null)
         {
             finishedOpenedEarly.add(reader);
-            replaceReader(currentlyOpenedEarly, reader, false);
+            replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
             moveStarts(reader, Functions.constant(reader.last), false);
         }
         finishedWriters.add(Pair.create(writer, reader));
@@ -327,38 +305,34 @@ public class SSTableRewriter
         writer = newWriter;
     }
 
-    public void finish()
-    {
-        finish(-1);
-    }
-    public void finish(long repairedAt)
-    {
-        finish(true, repairedAt);
-    }
-    public void finish(boolean cleanupOldReaders)
+    public List<SSTableReader> finish()
     {
-        finish(cleanupOldReaders, -1);
+        return finish(-1);
     }
 
     /**
      * Finishes the new file(s)
      *
-     * Creates final files, adds the new files to the dataTracker (via replaceReader) but only marks the
-     * old files as compacted if cleanupOldReaders is set to true. Otherwise it is up to the caller to do those gymnastics
-     * (ie, call DataTracker#markCompactedSSTablesReplaced(..))
+     * Creates final files, adds the new files to the dataTracker (via replaceReader).
+     *
+     * We add them to the tracker to be able to get rid of the tmpfiles
+     *
+     * It is up to the caller to do the compacted sstables replacement
+     * gymnastics (ie, call DataTracker#markCompactedSSTablesReplaced(..))
+     *
      *
-     * @param cleanupOldReaders if we should replace the old files with the new ones
      * @param repairedAt the repair time, -1 if we should use the time we supplied when we created
      *                   the SSTableWriter (and called rewriter.switchWriter(..)), actual time if we want to override the
      *                   repair time.
      */
-    public void finish(boolean cleanupOldReaders, long repairedAt)
+    public List<SSTableReader> finish(long repairedAt)
     {
+        List<SSTableReader> finished = new ArrayList<>();
         if (writer.getFilePointer() > 0)
         {
             SSTableReader reader = repairedAt < 0 ? writer.closeAndOpenReader(maxAge) : writer.closeAndOpenReader(maxAge, repairedAt);
             finished.add(reader);
-            replaceReader(currentlyOpenedEarly, reader, false);
+            replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
             moveStarts(reader, Functions.constant(reader.last), false);
         }
         else
@@ -373,7 +347,7 @@ public class SSTableRewriter
                 SSTableReader newReader = repairedAt < 0 ? w.left.closeAndOpenReader(maxAge) : w.left.closeAndOpenReader(maxAge, repairedAt);
                 finished.add(newReader);
                 // w.right is the tmplink-reader we added when switching writer, replace with the real sstable.
-                replaceReader(w.right, newReader, false);
+                replaceEarlyOpenedFile(w.right, newReader);
             }
             else
             {
@@ -384,23 +358,7 @@ public class SSTableRewriter
         if (!isOffline)
         {
             dataTracker.unmarkCompacting(finished);
-            if (cleanupOldReaders)
-                dataTracker.markCompactedSSTablesReplaced(rewriting, finished, rewriteType);
         }
-        else if (cleanupOldReaders)
-        {
-            for (SSTableReader reader : rewriting)
-            {
-                reader.markObsolete();
-                reader.releaseReference();
-            }
-        }
-        isFinished = true;
-    }
-
-    public List<SSTableReader> finished()
-    {
-        assert isFinished;
         return finished;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 6e1ac5f..5ed4f4a 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
@@ -31,6 +32,7 @@ import java.util.concurrent.ExecutionException;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ArrayBackedSortedColumns;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
@@ -41,6 +43,9 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.junit.After;
 import org.junit.Test;
@@ -89,7 +94,48 @@ public class AntiCompactionTest extends SchemaLoader
         assertEquals(repairedKeys, 4);
         assertEquals(nonRepairedKeys, 6);
     }
-    
+    @Test
+    public void antiCompactionSizeTest() throws ExecutionException, InterruptedException, IOException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.disableAutoCompaction();
+        SSTableReader s = writeFile(cfs, 1000);
+        cfs.addSSTable(s);
+        long origSize = s.bytesOnDisk();
+        System.out.println(cfs.metric.liveDiskSpaceUsed.count());
+        Range<Token> range = new Range<Token>(new BytesToken(ByteBufferUtil.bytes(0)), new BytesToken(ByteBufferUtil.bytes(500)));
+        Collection<SSTableReader> sstables = cfs.getSSTables();
+        SSTableReader.acquireReferences(sstables);
+        CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), sstables, 12345);
+        long sum = 0;
+        for (SSTableReader x : cfs.getSSTables())
+            sum += x.bytesOnDisk();
+        assertEquals(sum, cfs.metric.liveDiskSpaceUsed.count());
+        assertEquals(origSize, cfs.metric.liveDiskSpaceUsed.count(), 100000);
+
+    }
+
+    private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
+    {
+        ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+        for (int i = 0; i < count; i++)
+            cf.addColumn(Util.column(String.valueOf(i), "a", 1));
+        File dir = cfs.directories.getDirectoryForNewSSTables();
+        String filename = cfs.getTempSSTablePath(dir);
+
+        SSTableWriter writer = new SSTableWriter(filename,
+                0,
+                0,
+                cfs.metadata,
+                StorageService.getPartitioner(),
+                new MetadataCollector(cfs.metadata.comparator));
+
+        for (int i = 0; i < count * 5; i++)
+            writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+        return writer.closeAndOpenReader();
+    }
+
     @Test
     public void shouldSkipAntiCompactionForNonIntersectingRange() throws InterruptedException, ExecutionException, IOException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index b621c45..0a2b5a6 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -428,7 +428,7 @@ public class IndexSummaryManagerTest extends SchemaLoader
         }
 
         // don't leave replaced SSTRs around to break other tests
-        cfs.getDataTracker().replaceReaders(Collections.singleton(original), Collections.singleton(sstable), true);
+        cfs.getDataTracker().replaceWithNewInstances(Collections.singleton(original), Collections.singleton(sstable));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 7f85019..6f8ab62 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -412,7 +412,7 @@ public class SSTableReaderTest extends SchemaLoader
         }
 
         SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(store, 1);
-        store.getDataTracker().replaceReaders(Arrays.asList(sstable), Arrays.asList(replacement), true);
+        store.getDataTracker().replaceWithNewInstances(Arrays.asList(sstable), Arrays.asList(replacement));
         for (Future future : futures)
             future.get();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 8b203ac..4d248bd 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import com.google.common.collect.Sets;
 import org.junit.Test;
@@ -40,6 +41,7 @@ import org.apache.cassandra.db.compaction.ICompactionScanner;
 import org.apache.cassandra.db.compaction.LazilyCompactedRow;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import static org.junit.Assert.assertEquals;
@@ -66,7 +68,7 @@ public class SSTableRewriterTest extends SchemaLoader
         cfs.forceBlockingFlush();
         Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
         assertEquals(1, sstables.size());
-        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, OperationType.COMPACTION, false);
+        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
         AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);
         ICompactionScanner scanner = scanners.scanners.get(0);
         CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
@@ -76,7 +78,7 @@ public class SSTableRewriterTest extends SchemaLoader
             AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next()));
             writer.append(row);
         }
-        writer.finish();
+        cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, writer.finish(), OperationType.COMPACTION);
 
         validateCFS(cfs);
 
@@ -142,7 +144,7 @@ public class SSTableRewriterTest extends SchemaLoader
 
 
     @Test
-    public void testNumberOfFiles() throws InterruptedException
+    public void testNumberOfFilesAndSizes() throws InterruptedException
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -150,10 +152,10 @@ public class SSTableRewriterTest extends SchemaLoader
 
         SSTableReader s = writeFile(cfs, 1000);
         cfs.addSSTable(s);
-
+        long startStorageMetricsLoad = StorageMetrics.load.count();
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         ICompactionScanner scanner = s.getScanner();
@@ -167,13 +169,23 @@ public class SSTableRewriterTest extends SchemaLoader
                 rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
                 files++;
                 assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+                assertEquals(s.bytesOnDisk(), cfs.metric.liveDiskSpaceUsed.count());
+                assertEquals(s.bytesOnDisk(), cfs.metric.totalDiskSpaceUsed.count());
+
             }
         }
-        rewriter.finish();
-        assertEquals(files, rewriter.finished().size());
+        List<SSTableReader> sstables = rewriter.finish();
+        cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+        long sum = 0;
+        for (SSTableReader x : cfs.getSSTables())
+            sum += x.bytesOnDisk();
+        assertEquals(sum, cfs.metric.liveDiskSpaceUsed.count());
+        assertEquals(startStorageMetricsLoad - s.bytesOnDisk() + sum, StorageMetrics.load.count());
+        assertEquals(files, sstables.size());
         assertEquals(files, cfs.getSSTables().size());
         Thread.sleep(1000);
         // tmplink and tmp files should be gone:
+        assertEquals(sum, cfs.metric.totalDiskSpaceUsed.count());
         assertFileCounts(s.descriptor.directory.list(), 0, 0);
         validateCFS(cfs);
     }
@@ -190,7 +202,7 @@ public class SSTableRewriterTest extends SchemaLoader
 
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         ICompactionScanner scanner = s.getScanner();
@@ -206,10 +218,10 @@ public class SSTableRewriterTest extends SchemaLoader
                 assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
             }
         }
-        rewriter.finish(false);
-        assertEquals(files, rewriter.finished().size());
+        List<SSTableReader> sstables = rewriter.finish();
+        assertEquals(files, sstables.size());
         assertEquals(files + 1, cfs.getSSTables().size());
-        cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, rewriter.finished(), OperationType.COMPACTION);
+        cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
         assertEquals(files, cfs.getSSTables().size());
         Thread.sleep(1000);
         assertFileCounts(s.descriptor.directory.list(), 0, 0);
@@ -226,11 +238,12 @@ public class SSTableRewriterTest extends SchemaLoader
 
         SSTableReader s = writeFile(cfs, 1000);
         cfs.addSSTable(s);
+        long startSize = cfs.metric.liveDiskSpaceUsed.count();
         DecoratedKey origFirst = s.first;
         DecoratedKey origLast = s.last;
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         ICompactionScanner scanner = s.getScanner();
@@ -248,6 +261,7 @@ public class SSTableRewriterTest extends SchemaLoader
         }
         rewriter.abort();
         Thread.sleep(1000);
+        assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.count());
         assertEquals(1, cfs.getSSTables().size());
         assertFileCounts(s.descriptor.directory.list(), 0, 0);
         assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
@@ -270,7 +284,7 @@ public class SSTableRewriterTest extends SchemaLoader
         DecoratedKey origLast = s.last;
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         ICompactionScanner scanner = s.getScanner();
@@ -313,7 +327,7 @@ public class SSTableRewriterTest extends SchemaLoader
 
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         ICompactionScanner scanner = s.getScanner();
@@ -331,7 +345,8 @@ public class SSTableRewriterTest extends SchemaLoader
             if (files == 3)
             {
                 //testing to finish when we have nothing written in the new file
-                rewriter.finish();
+                List<SSTableReader> sstables = rewriter.finish();
+                cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
                 break;
             }
         }
@@ -353,7 +368,7 @@ public class SSTableRewriterTest extends SchemaLoader
         cfs.addSSTable(s);
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         ICompactionScanner scanner = s.getScanner();
@@ -369,7 +384,8 @@ public class SSTableRewriterTest extends SchemaLoader
                 assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
             }
         }
-        rewriter.finish();
+        List<SSTableReader> sstables = rewriter.finish();
+        cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
         Thread.sleep(1000);
         assertFileCounts(s.descriptor.directory.list(), 0, 0);
         cfs.truncateBlocking();
@@ -389,7 +405,7 @@ public class SSTableRewriterTest extends SchemaLoader
         cfs.addSSTable(s);
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(1000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         ICompactionScanner scanner = s.getScanner();
@@ -406,8 +422,9 @@ public class SSTableRewriterTest extends SchemaLoader
                 files++;
             }
         }
-        rewriter.finish();
-        assertEquals(files, rewriter.finished().size());
+        List<SSTableReader> sstables = rewriter.finish();
+        cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+        assertEquals(files, sstables.size());
         assertEquals(files, cfs.getSSTables().size());
         Thread.sleep(1000);
         assertFileCounts(s.descriptor.directory.list(), 0, 0);


[3/5] Merge branch 'cassandra-2.1' into trunk

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 0000000,4d248bd..58803c3
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@@ -1,0 -1,490 +1,504 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.io.sstable;
+ 
+ import java.io.File;
+ import java.nio.ByteBuffer;
+ import java.util.Arrays;
 -import java.util.Collection;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
+ import com.google.common.collect.Sets;
++import org.junit.After;
++import org.junit.BeforeClass;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.Util;
++import org.apache.cassandra.config.KSMetaData;
+ import org.apache.cassandra.db.ArrayBackedSortedColumns;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.DecoratedKey;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.Mutation;
+ import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+ import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+ import org.apache.cassandra.db.compaction.CompactionController;
+ import org.apache.cassandra.db.compaction.ICompactionScanner;
+ import org.apache.cassandra.db.compaction.LazilyCompactedRow;
+ import org.apache.cassandra.db.compaction.OperationType;
 -import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
++import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.io.sstable.format.SSTableReader;
++import org.apache.cassandra.io.sstable.format.SSTableWriter;
++import org.apache.cassandra.locator.SimpleStrategy;
+ import org.apache.cassandra.metrics.StorageMetrics;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
+ 
+ public class SSTableRewriterTest extends SchemaLoader
+ {
 -    private static final String KEYSPACE = "Keyspace1";
++    private static final String KEYSPACE = "SSTableRewriterTest";
+     private static final String CF = "Standard1";
++
++    @BeforeClass
++    public static void defineSchema() throws ConfigurationException
++    {
++        SchemaLoader.prepareServer();
++        SchemaLoader.createKeyspace(KEYSPACE,
++                SimpleStrategy.class,
++                KSMetaData.optsWithRF(1),
++                SchemaLoader.standardCFMD(KEYSPACE, CF));
++    }
++
++    @After
++    public void truncateCF()
++    {
++        Keyspace keyspace = Keyspace.open(KEYSPACE);
++        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
++        store.truncateBlocking();
++    }
++
++
+     @Test
+     public void basicTest() throws InterruptedException
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+         cfs.truncateBlocking();
+         for (int j = 0; j < 100; j ++)
+         {
+             ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j));
+             Mutation rm = new Mutation(KEYSPACE, key);
+             rm.add(CF, Util.cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, j);
+             rm.apply();
+         }
+         cfs.forceBlockingFlush();
+         Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
+         assertEquals(1, sstables.size());
+         SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
+         AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);
+         ICompactionScanner scanner = scanners.scanners.get(0);
+         CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
+         writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
+         while(scanner.hasNext())
+         {
+             AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next()));
+             writer.append(row);
+         }
+         cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, writer.finish(), OperationType.COMPACTION);
+ 
+         validateCFS(cfs);
+ 
+     }
+ 
+ 
+     @Test
+     public void testFileRemoval() throws InterruptedException
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+         cfs.truncateBlocking();
+         ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
 -        for (int i = 0; i < 1000; i++)
 -            cf.addColumn(Util.column(String.valueOf(i), "a", 1));
++        for (int i = 0; i < 100; i++)
++            cf.addColumn(Util.cellname(i), ByteBuffer.allocate(1000), 1);
+         File dir = cfs.directories.getDirectoryForNewSSTables();
+         SSTableWriter writer = getWriter(cfs, dir);
 -
+         for (int i = 0; i < 500; i++)
+             writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+         SSTableReader s = writer.openEarly(1000);
+         assertFileCounts(dir.list(), 2, 3);
+         for (int i = 500; i < 1000; i++)
+             writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+         SSTableReader s2 = writer.openEarly(1000);
+         assertTrue(s != s2);
+         assertFileCounts(dir.list(), 2, 3);
+         s.markObsolete();
+         s.releaseReference();
+         Thread.sleep(1000);
+         assertFileCounts(dir.list(), 0, 3);
+         writer.abort(false);
+         Thread.sleep(1000);
+         assertFileCounts(dir.list(), 0, 0);
+         validateCFS(cfs);
+     }
+ 
+     @Test
+     public void testFileRemovalNoAbort() throws InterruptedException
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+         cfs.truncateBlocking();
+         ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+         for (int i = 0; i < 1000; i++)
+             cf.addColumn(Util.column(String.valueOf(i), "a", 1));
+         File dir = cfs.directories.getDirectoryForNewSSTables();
+         SSTableWriter writer = getWriter(cfs, dir);
+ 
+         for (int i = 0; i < 500; i++)
+             writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+         SSTableReader s = writer.openEarly(1000);
+         //assertFileCounts(dir.list(), 2, 3);
+         for (int i = 500; i < 1000; i++)
+             writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+         writer.closeAndOpenReader();
+         s.markObsolete();
+         s.releaseReference();
+         Thread.sleep(1000);
+         assertFileCounts(dir.list(), 0, 0);
+         validateCFS(cfs);
+     }
+ 
+ 
+     @Test
+     public void testNumberOfFilesAndSizes() throws InterruptedException
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+         cfs.truncateBlocking();
+ 
+         SSTableReader s = writeFile(cfs, 1000);
+         cfs.addSSTable(s);
+         long startStorageMetricsLoad = StorageMetrics.load.count();
+         Set<SSTableReader> compacting = Sets.newHashSet(s);
+         SSTableRewriter.overrideOpenInterval(10000000);
+         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ 
+         ICompactionScanner scanner = s.getScanner();
+         CompactionController controller = new CompactionController(cfs, compacting, 0);
+         int files = 1;
+         while(scanner.hasNext())
+         {
+             rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+             if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+             {
+                 rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                 files++;
+                 assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+                 assertEquals(s.bytesOnDisk(), cfs.metric.liveDiskSpaceUsed.count());
+                 assertEquals(s.bytesOnDisk(), cfs.metric.totalDiskSpaceUsed.count());
+ 
+             }
+         }
+         List<SSTableReader> sstables = rewriter.finish();
+         cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+         long sum = 0;
+         for (SSTableReader x : cfs.getSSTables())
+             sum += x.bytesOnDisk();
+         assertEquals(sum, cfs.metric.liveDiskSpaceUsed.count());
+         assertEquals(startStorageMetricsLoad - s.bytesOnDisk() + sum, StorageMetrics.load.count());
+         assertEquals(files, sstables.size());
+         assertEquals(files, cfs.getSSTables().size());
+         Thread.sleep(1000);
+         // tmplink and tmp files should be gone:
+         assertEquals(sum, cfs.metric.totalDiskSpaceUsed.count());
+         assertFileCounts(s.descriptor.directory.list(), 0, 0);
+         validateCFS(cfs);
+     }
+ 
+     @Test
+     public void testNumberOfFiles_dont_clean_readers() throws InterruptedException
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+         cfs.truncateBlocking();
+ 
+         SSTableReader s = writeFile(cfs, 1000);
+         cfs.addSSTable(s);
+ 
+         Set<SSTableReader> compacting = Sets.newHashSet(s);
+         SSTableRewriter.overrideOpenInterval(10000000);
+         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ 
+         ICompactionScanner scanner = s.getScanner();
+         CompactionController controller = new CompactionController(cfs, compacting, 0);
+         int files = 1;
+         while(scanner.hasNext())
+         {
+             rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+             if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+             {
+                 rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                 files++;
+                 assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+             }
+         }
+         List<SSTableReader> sstables = rewriter.finish();
+         assertEquals(files, sstables.size());
+         assertEquals(files + 1, cfs.getSSTables().size());
+         cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+         assertEquals(files, cfs.getSSTables().size());
+         Thread.sleep(1000);
+         assertFileCounts(s.descriptor.directory.list(), 0, 0);
+         validateCFS(cfs);
+     }
+ 
+ 
+     @Test
+     public void testNumberOfFiles_abort() throws InterruptedException
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+         cfs.truncateBlocking();
+ 
+         SSTableReader s = writeFile(cfs, 1000);
+         cfs.addSSTable(s);
+         long startSize = cfs.metric.liveDiskSpaceUsed.count();
+         DecoratedKey origFirst = s.first;
+         DecoratedKey origLast = s.last;
+         Set<SSTableReader> compacting = Sets.newHashSet(s);
+         SSTableRewriter.overrideOpenInterval(10000000);
+         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ 
+         ICompactionScanner scanner = s.getScanner();
+         CompactionController controller = new CompactionController(cfs, compacting, 0);
+         int files = 1;
+         while(scanner.hasNext())
+         {
+             rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+             if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+             {
+                 rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                 files++;
+                 assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+             }
+         }
+         rewriter.abort();
+         Thread.sleep(1000);
+         assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.count());
+         assertEquals(1, cfs.getSSTables().size());
+         assertFileCounts(s.descriptor.directory.list(), 0, 0);
+         assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
+         assertEquals(cfs.getSSTables().iterator().next().last, origLast);
+         validateCFS(cfs);
+ 
+     }
+ 
+     @Test
+     public void testNumberOfFiles_abort2() throws InterruptedException
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+         cfs.truncateBlocking();
+ 
+         SSTableReader s = writeFile(cfs, 1000);
+         cfs.addSSTable(s);
+ 
+         DecoratedKey origFirst = s.first;
+         DecoratedKey origLast = s.last;
+         Set<SSTableReader> compacting = Sets.newHashSet(s);
+         SSTableRewriter.overrideOpenInterval(10000000);
+         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ 
+         ICompactionScanner scanner = s.getScanner();
+         CompactionController controller = new CompactionController(cfs, compacting, 0);
+         int files = 1;
+         while(scanner.hasNext())
+         {
+             rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+             if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+             {
+                 rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                 files++;
+                 assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+             }
+             if (files == 3)
+             {
+                 //testing to abort when we have nothing written in the new file
+                 rewriter.abort();
+                 break;
+             }
+         }
+         Thread.sleep(1000);
+         assertEquals(1, cfs.getSSTables().size());
+         assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ 
+         assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
+         assertEquals(cfs.getSSTables().iterator().next().last, origLast);
+         validateCFS(cfs);
+     }
+ 
+     @Test
+     public void testNumberOfFiles_finish_empty_new_writer() throws InterruptedException
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+         cfs.truncateBlocking();
+ 
+         SSTableReader s = writeFile(cfs, 1000);
+         cfs.addSSTable(s);
+ 
+         Set<SSTableReader> compacting = Sets.newHashSet(s);
+         SSTableRewriter.overrideOpenInterval(10000000);
+         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ 
+         ICompactionScanner scanner = s.getScanner();
+         CompactionController controller = new CompactionController(cfs, compacting, 0);
+         int files = 1;
+         while(scanner.hasNext())
+         {
+             rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+             if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+             {
+                 rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                 files++;
+                 assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+             }
+             if (files == 3)
+             {
+                 //testing to finish when we have nothing written in the new file
+                 List<SSTableReader> sstables = rewriter.finish();
+                 cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+                 break;
+             }
+         }
+         Thread.sleep(1000);
+         assertEquals(files - 1, cfs.getSSTables().size()); // we never wrote anything to the last file
+         assertFileCounts(s.descriptor.directory.list(), 0, 0);
+         validateCFS(cfs);
+     }
+ 
+     @Test
+     public void testNumberOfFiles_truncate() throws InterruptedException
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+         cfs.truncateBlocking();
+         cfs.disableAutoCompaction();
+ 
+         SSTableReader s = writeFile(cfs, 1000);
+         cfs.addSSTable(s);
+         Set<SSTableReader> compacting = Sets.newHashSet(s);
+         SSTableRewriter.overrideOpenInterval(10000000);
+         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ 
+         ICompactionScanner scanner = s.getScanner();
+         CompactionController controller = new CompactionController(cfs, compacting, 0);
+         int files = 1;
+         while(scanner.hasNext())
+         {
+             rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+             if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+             {
+                 rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                 files++;
+                 assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+             }
+         }
+         List<SSTableReader> sstables = rewriter.finish();
+         cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+         Thread.sleep(1000);
+         assertFileCounts(s.descriptor.directory.list(), 0, 0);
+         cfs.truncateBlocking();
+         validateCFS(cfs);
+     }
+ 
+     @Test
+     public void testSmallFiles() throws InterruptedException
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+         cfs.truncateBlocking();
+         cfs.disableAutoCompaction();
+ 
+         SSTableReader s = writeFile(cfs, 400);
+         DecoratedKey origFirst = s.first;
+         cfs.addSSTable(s);
+         Set<SSTableReader> compacting = Sets.newHashSet(s);
+         SSTableRewriter.overrideOpenInterval(1000000);
+         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ 
+         ICompactionScanner scanner = s.getScanner();
+         CompactionController controller = new CompactionController(cfs, compacting, 0);
+         int files = 1;
+         while(scanner.hasNext())
+         {
+             rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+             if (rewriter.currentWriter().getOnDiskFilePointer() > 2500000)
+             {
+                 assertEquals(1, cfs.getSSTables().size()); // we dont open small files early ...
+                 assertEquals(origFirst, cfs.getSSTables().iterator().next().first); // ... and the first key should stay the same
+                 rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                 files++;
+             }
+         }
+         List<SSTableReader> sstables = rewriter.finish();
+         cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+         assertEquals(files, sstables.size());
+         assertEquals(files, cfs.getSSTables().size());
+         Thread.sleep(1000);
+         assertFileCounts(s.descriptor.directory.list(), 0, 0);
+         validateCFS(cfs);
+     }
+ 
+     private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
+     {
+         ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+         for (int i = 0; i < count / 100; i++)
+             cf.addColumn(Util.cellname(i), ByteBuffer.allocate(1000), 1);
+         File dir = cfs.directories.getDirectoryForNewSSTables();
+         String filename = cfs.getTempSSTablePath(dir);
+ 
 -        SSTableWriter writer = new SSTableWriter(filename,
 -                0,
 -                0,
 -                cfs.metadata,
 -                StorageService.getPartitioner(),
 -                new MetadataCollector(cfs.metadata.comparator));
++        SSTableWriter writer = SSTableWriter.create(filename, 0, 0);
+ 
+         for (int i = 0; i < count * 5; i++)
+             writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+         return writer.closeAndOpenReader();
+     }
+ 
+     private void validateCFS(ColumnFamilyStore cfs)
+     {
+         for (SSTableReader sstable : cfs.getSSTables())
+         {
+             assertFalse(sstable.isMarkedCompacted());
+             assertEquals(1, sstable.referenceCount());
+         }
+         assertTrue(cfs.getDataTracker().getCompacting().isEmpty());
+     }
+ 
+ 
+     private void assertFileCounts(String [] files, int expectedtmplinkCount, int expectedtmpCount)
+     {
+         int tmplinkcount = 0;
+         int tmpcount = 0;
+         for (String f : files)
+         {
 -            if (f.contains("-tmplink-"))
++            if (f.contains("tmplink-"))
+                 tmplinkcount++;
 -            if (f.contains("-tmp-"))
++            if (f.contains("tmp-"))
+                 tmpcount++;
+         }
+         assertEquals(expectedtmplinkCount, tmplinkcount);
+         assertEquals(expectedtmpCount, tmpcount);
+     }
+ 
+     private SSTableWriter getWriter(ColumnFamilyStore cfs, File directory)
+     {
+         String filename = cfs.getTempSSTablePath(directory);
 -        return new SSTableWriter(filename,
 -                                 0,
 -                                 0,
 -                                 cfs.metadata,
 -                                 StorageService.getPartitioner(),
 -                                 new MetadataCollector(cfs.metadata.comparator));
++        return SSTableWriter.create(filename, 0, 0);
+     }
+ }


[5/5] git commit: Merge branch 'cassandra-2.1' into trunk

Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into trunk

Conflicts:
	src/java/org/apache/cassandra/db/compaction/CompactionManager.java
	src/java/org/apache/cassandra/db/compaction/Upgrader.java
	src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
	test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0f59629c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0f59629c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0f59629c

Branch: refs/heads/trunk
Commit: 0f59629ce280ba2a74d65a7719dde7cf79923f05
Parents: e60a06c 5160c91
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Nov 3 17:02:10 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Nov 3 17:02:10 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/cassandra/db/DataTracker.java    | 109 ++--
 .../db/compaction/CompactionManager.java        |  29 +-
 .../cassandra/db/compaction/CompactionTask.java |   7 +-
 .../cassandra/db/compaction/Scrubber.java       |  12 +-
 .../cassandra/db/compaction/Upgrader.java       |  31 +-
 .../io/sstable/IndexSummaryManager.java         |   2 +-
 .../cassandra/io/sstable/SSTableRewriter.java   | 160 +++---
 .../io/sstable/format/SSTableReader.java        |   6 +
 .../db/compaction/AntiCompactionTest.java       |  42 +-
 .../io/sstable/IndexSummaryManagerTest.java     |   2 +-
 .../cassandra/io/sstable/SSTableReaderTest.java |   2 +-
 .../io/sstable/SSTableRewriterTest.java         | 504 +++++++++++++++++++
 13 files changed, 755 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 3a8ada2,32083cc..9754110
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,37 -1,6 +1,39 @@@
 +3.0
 + * Mark sstables as repaired after full repair (CASSANDRA-7586) 
 + * Extend Descriptor to include a format value and refactor reader/writer apis (CASSANDRA-7443)
 + * Integrate JMH for microbenchmarks (CASSANDRA-8151)
 + * Keep sstable levels when bootstrapping (CASSANDRA-7460)
 + * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
 + * Support for aggregation functions (CASSANDRA-4914)
 + * Remove cassandra-cli (CASSANDRA-7920)
 + * Accept dollar quoted strings in CQL (CASSANDRA-7769)
 + * Make assassinate a first class command (CASSANDRA-7935)
 + * Support IN clause on any clustering column (CASSANDRA-4762)
 + * Improve compaction logging (CASSANDRA-7818)
 + * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
 + * Do anticompaction in groups (CASSANDRA-6851)
 + * Support pure user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
 +   7924, 7812, 8063)
 + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
 + * Move sstable RandomAccessReader to nio2, which allows using the
 +   FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
 + * Remove CQL2 (CASSANDRA-5918)
 + * Add Thrift get_multi_slice call (CASSANDRA-6757)
 + * Optimize fetching multiple cells by name (CASSANDRA-6933)
 + * Allow compilation in java 8 (CASSANDRA-7028)
 + * Make incremental repair default (CASSANDRA-7250)
 + * Enable code coverage thru JaCoCo (CASSANDRA-7226)
 + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) 
 + * Shorten SSTable path (CASSANDRA-6962)
 + * Use unsafe mutations for most unit tests (CASSANDRA-6969)
 + * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
 + * Fail on very large batch sizes (CASSANDRA-8011)
 + * improve concurrency of repair (CASSANDRA-6455)
 +
 +
  2.1.2
+  * Refactor how we track live size (CASSANDRA-7852)
+  * Make sure unfinished compaction files are removed (CASSANDRA-8124)
   * Fix shutdown when run as Windows service (CASSANDRA-8136)
   * Fix DESCRIBE TABLE with custom indexes (CASSANDRA-8031)
   * Fix race in RecoveryManagerTest (CASSANDRA-8176)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 3ee36cd,84c3cb5..cccb7f9
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1045,76 -987,63 +1046,78 @@@ public class CompactionManager implemen
              if (!new File(sstable.getFilename()).exists())
              {
                  logger.info("Skipping anticompaction for {}, required sstable was compacted and is no longer available.", sstable);
 +                i.remove();
                  continue;
              }
 +            if (groupMaxDataAge < sstable.maxDataAge)
 +                groupMaxDataAge = sstable.maxDataAge;
 +        }
 +
 +     
 +        if (anticompactionGroup.size() == 0)
 +        {
 +            logger.info("No valid anticompactions for this group, All sstables were compacted and are no longer available");
 +            return 0;
 +        }
  
 -            logger.info("Anticompacting {}", sstable);
 -            Set<SSTableReader> sstableAsSet = new HashSet<>();
 -            sstableAsSet.add(sstable);
 +        logger.info("Anticompacting {}", anticompactionGroup);
 +        Set<SSTableReader> sstableAsSet = new HashSet<>(anticompactionGroup);
  
 -            File destination = cfs.directories.getDirectoryForNewSSTables();
 -            SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
 -            SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
 +        File destination = cfs.directories.getDirectoryForNewSSTables();
-         SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, OperationType.ANTICOMPACTION, false);
-         SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, OperationType.ANTICOMPACTION, false);
++        SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, false);
++        SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, false);
  
 -            AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
 -            try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(new HashSet<>(Collections.singleton(sstable)));
 -                 CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
 -            {
 -                repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable));
 -                unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable));
 +        long repairedKeyCount = 0;
 +        long unrepairedKeyCount = 0;
 +        AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
 +        try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup);
 +             CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
 +        {
 +            int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(anticompactionGroup)));
  
 -                CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
 -                Iterator<AbstractCompactedRow> iter = ci.iterator();
 -                while(iter.hasNext())
 +            repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
 +            unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet));
 +
 +            CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat());
 +            Iterator<AbstractCompactedRow> iter = ci.iterator();
 +            while(iter.hasNext())
 +            {
 +                AbstractCompactedRow row = iter.next();
 +                // if current range from sstable is repaired, save it into the new repaired sstable
 +                if (Range.isInRanges(row.key.getToken(), ranges))
                  {
 -                    AbstractCompactedRow row = iter.next();
 -                    // if current range from sstable is repaired, save it into the new repaired sstable
 -                    if (Range.isInRanges(row.key.getToken(), ranges))
 -                    {
 -                        repairedSSTableWriter.append(row);
 -                        repairedKeyCount++;
 -                    }
 -                    // otherwise save into the new 'non-repaired' table
 -                    else
 -                    {
 -                        unRepairedSSTableWriter.append(row);
 -                        unrepairedKeyCount++;
 -                    }
 +                    repairedSSTableWriter.append(row);
 +                    repairedKeyCount++;
 +                }
 +                // otherwise save into the new 'non-repaired' table
 +                else
 +                {
 +                    unRepairedSSTableWriter.append(row);
 +                    unrepairedKeyCount++;
                  }
 -                // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
 -                // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
 -                // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
 -                anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
 -                anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
 -                cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION);
 -            }
 -            catch (Throwable e)
 -            {
 -                JVMStabilityInspector.inspectThrowable(e);
 -                logger.error("Error anticompacting " + sstable, e);
 -                repairedSSTableWriter.abort();
 -                unRepairedSSTableWriter.abort();
              }
 +            // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
 +            // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
-             repairedSSTableWriter.finish(false, repairedAt);
-             unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE);
-             // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
++            List<SSTableReader> anticompactedSSTables = new ArrayList<>();
++            anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
++            anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
++            cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION);
++
 +            logger.debug("Repaired {} keys out of {} for {}/{} in {}", repairedKeyCount,
 +                                                                       repairedKeyCount + unrepairedKeyCount,
 +                                                                       cfs.keyspace.getName(),
 +                                                                       cfs.getColumnFamilyName(),
 +                                                                       anticompactionGroup);
-             return repairedSSTableWriter.finished().size() + unRepairedSSTableWriter.finished().size();
++            return anticompactedSSTables.size();
          }
 -        String format = "Repaired {} keys of {} for {}/{}";
 -        logger.debug(format, repairedKeyCount, (repairedKeyCount + unrepairedKeyCount), cfs.keyspace, cfs.getColumnFamilyName());
 -        String format2 = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";
 -        logger.info(format2, repairedSSTables.size(), anticompactedSSTables.size());
 -
 -        return anticompactedSSTables;
 +        catch (Throwable e)
 +        {
 +            JVMStabilityInspector.inspectThrowable(e);
 +            logger.error("Error anticompacting " + anticompactionGroup, e);
 +            repairedSSTableWriter.abort();
 +            unRepairedSSTableWriter.abort();
 +        }
 +        return 0;
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 584ff38,b442482..808626b
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -158,9 -150,9 +158,9 @@@ public class CompactionTask extends Abs
  
              try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
              {
 -                AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller);
 +                AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat);
                  Iterator<AbstractCompactedRow> iter = ci.iterator();
- 
+                 List<SSTableReader> newSStables;
                  // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
                  // replace the old entries.  Track entries to preheat here until then.
                  long minRepairedAt = getMinRepairedAt(actuallyCompact);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Upgrader.java
index c9e7034,39f668d..52739de
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@@ -21,8 -21,9 +21,9 @@@ import java.io.File
  import java.util.*;
  
  import com.google.common.base.Throwables;
+ import com.google.common.collect.Sets;
  
 +import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.DecoratedKey;
  import org.apache.cassandra.io.sstable.*;
@@@ -69,29 -67,24 +68,24 @@@ public class Upgrade
  
          // Get the max timestamp of the precompacted sstables
          // and adds generation of live ancestors
-         // -- note that we always only have one SSTable in toUpgrade here:
-         for (SSTableReader sstable : toUpgrade)
+         sstableMetadataCollector.addAncestor(sstable.descriptor.generation);
+         for (Integer i : sstable.getAncestors())
          {
-             sstableMetadataCollector.addAncestor(sstable.descriptor.generation);
-             for (Integer i : sstable.getAncestors())
-             {
-                 if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
-                     sstableMetadataCollector.addAncestor(i);
-             }
-             sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
+             if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
+                 sstableMetadataCollector.addAncestor(i);
          }
- 
+         sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
 -        return new SSTableWriter(cfs.getTempSSTablePath(directory), estimatedRows, repairedAt, cfs.metadata, cfs.partitioner, sstableMetadataCollector);
 +        return SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(directory)), estimatedRows, repairedAt, cfs.metadata, cfs.partitioner, sstableMetadataCollector);
      }
  
      public void upgrade()
      {
          outputHandler.output("Upgrading " + sstable);
- 
-         SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(this.toUpgrade), OperationType.UPGRADE_SSTABLES, true);
-         try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(this.toUpgrade))
+         Set<SSTableReader> toUpgrade = Sets.newHashSet(sstable);
+         SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(toUpgrade), true);
+         try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(toUpgrade))
          {
 -            Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller).iterator();
 +            Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat()).iterator();
              writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
              while (iter.hasNext())
              {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f59629c/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 18825cb,4d5a06f..f3d08a6
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@@ -34,11 -35,9 +35,11 @@@ import org.apache.cassandra.db.DataTrac
  import org.apache.cassandra.db.DecoratedKey;
  import org.apache.cassandra.db.RowIndexEntry;
  import org.apache.cassandra.db.compaction.AbstractCompactedRow;
- import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
  import org.apache.cassandra.utils.CLibrary;
  import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.Pair;
  
  /**
   * Wraps one or more writers as output for rewriting one or more readers: every sstable_preemptive_open_interval_in_mb