You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/11/03 17:03:26 UTC

[2/2] git commit: Refactor how we track live size

Refactor how we track live size

Patch by marcuse; reviewed by yukim for CASSANDRA-7852


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5160c916
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5160c916
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5160c916

Branch: refs/heads/cassandra-2.1
Commit: 5160c916c90886f69023ddba0078a624e5cf202d
Parents: 9c316e7
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Oct 17 14:15:46 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Nov 3 16:39:19 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/DataTracker.java    | 109 ++++++++++++-------
 .../db/compaction/CompactionManager.java        |  26 ++---
 .../cassandra/db/compaction/CompactionTask.java |   7 +-
 .../cassandra/db/compaction/Scrubber.java       |  12 +-
 .../cassandra/db/compaction/Upgrader.java       |  31 +++---
 .../io/sstable/IndexSummaryManager.java         |   2 +-
 .../cassandra/io/sstable/SSTableRewriter.java   |  90 ++++-----------
 .../db/compaction/AntiCompactionTest.java       |  48 +++++++-
 .../io/sstable/IndexSummaryManagerTest.java     |   2 +-
 .../cassandra/io/sstable/SSTableReaderTest.java |   2 +-
 .../io/sstable/SSTableRewriterTest.java         |  57 ++++++----
 12 files changed, 219 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 681d616..32083cc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.2
+ * Refactor how we track live size (CASSANDRA-7852)
  * Make sure unfinished compaction files are removed (CASSANDRA-8124)
  * Fix shutdown when run as Windows service (CASSANDRA-8136)
  * Fix DESCRIBE TABLE with custom indexes (CASSANDRA-8031)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 7393323..7df2b75 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -254,33 +254,36 @@ public class DataTracker
 
     public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType)
     {
-        replace(sstables, Collections.<SSTableReader>emptyList());
+        removeSSTablesFromTracker(sstables);
+        releaseReferences(sstables, false);
         notifySSTablesChanged(sstables, Collections.<SSTableReader>emptyList(), compactionType);
     }
 
+    /**
+     *
+     * @param oldSSTables
+     * @param allReplacements
+     * @param compactionType
+     */
     // note that this DOES NOT insert the replacement sstables, it only removes the old sstables and notifies any listeners
     // that they have been replaced by the provided sstables, which must have been performed by an earlier replaceReaders() call
-    public void markCompactedSSTablesReplaced(Collection<SSTableReader> sstables, Collection<SSTableReader> allReplacements, OperationType compactionType)
+    public void markCompactedSSTablesReplaced(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> allReplacements, OperationType compactionType)
     {
-        replace(sstables, Collections.<SSTableReader>emptyList());
-        notifySSTablesChanged(sstables, allReplacements, compactionType);
-        for (SSTableReader sstable : allReplacements)
-        {
-            long bytesOnDisk = sstable.bytesOnDisk();
-            cfstore.metric.totalDiskSpaceUsed.inc(bytesOnDisk);
-            cfstore.metric.liveDiskSpaceUsed.inc(bytesOnDisk);
-        }
+        removeSSTablesFromTracker(oldSSTables);
+        releaseReferences(oldSSTables, false);
+        notifySSTablesChanged(oldSSTables, allReplacements, compactionType);
+        addNewSSTablesSize(allReplacements);
     }
 
     public void addInitialSSTables(Collection<SSTableReader> sstables)
     {
-        replace(Collections.<SSTableReader>emptyList(), sstables);
+        addSSTablesToTracker(sstables);
         // no notifications or backup necessary
     }
 
     public void addSSTables(Collection<SSTableReader> sstables)
     {
-        replace(Collections.<SSTableReader>emptyList(), sstables);
+        addSSTablesToTracker(sstables);
         for (SSTableReader sstable : sstables)
         {
             maybeIncrementallyBackup(sstable);
@@ -289,6 +292,32 @@ public class DataTracker
     }
 
     /**
+     * Replaces existing sstables with new instances, makes sure compaction strategies have the correct instance
+     *
+     * @param toReplace
+     * @param replaceWith
+     */
+    public void replaceWithNewInstances(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith)
+    {
+        replaceReaders(toReplace, replaceWith, true);
+    }
+
+    /**
+     * Adds the early opened files to the data tracker, but does not tell compaction strategies about it
+     *
+     * note that we dont track the live size of these sstables
+     * @param toReplace
+     * @param replaceWith
+     */
+    public void replaceEarlyOpenedFiles(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith)
+    {
+        for (SSTableReader s : toReplace)
+            assert s.openReason.equals(SSTableReader.OpenReason.EARLY);
+        // note that we can replace an early opened file with a real one
+        replaceReaders(toReplace, replaceWith, false);
+    }
+
+    /**
      * removes all sstables that are not busy compacting.
      */
     public void unreferenceSSTables()
@@ -310,7 +339,8 @@ public class DataTracker
             return;
         }
         notifySSTablesChanged(notCompacting, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN);
-        postReplace(notCompacting, Collections.<SSTableReader>emptySet(), true);
+        removeOldSSTablesSize(notCompacting);
+        releaseReferences(notCompacting, true);
     }
 
     /**
@@ -344,11 +374,11 @@ public class DataTracker
     void init()
     {
         view.set(new View(
-                         ImmutableList.of(new Memtable(cfstore)),
-                         ImmutableList.<Memtable>of(),
-                         Collections.<SSTableReader>emptySet(),
-                         Collections.<SSTableReader>emptySet(),
-                         SSTableIntervalTree.empty()));
+                ImmutableList.of(new Memtable(cfstore)),
+                ImmutableList.<Memtable>of(),
+                Collections.<SSTableReader>emptySet(),
+                Collections.<SSTableReader>emptySet(),
+                SSTableIntervalTree.empty()));
     }
 
     /**
@@ -358,7 +388,7 @@ public class DataTracker
      * @param oldSSTables replaced readers
      * @param newSSTables replacement readers
      */
-    public void replaceReaders(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> newSSTables, boolean notify)
+    private void replaceReaders(Collection<SSTableReader> oldSSTables, Collection<SSTableReader> newSSTables, boolean notify)
     {
         View currentView, newView;
         do
@@ -369,7 +399,7 @@ public class DataTracker
         while (!view.compareAndSet(currentView, newView));
 
         if (!oldSSTables.isEmpty() && notify)
-            notifySSTablesChanged(oldSSTables, newSSTables, OperationType.COMPACTION);
+            notifySSTablesChanged(oldSSTables, newSSTables, OperationType.UNKNOWN);
 
         for (SSTableReader sstable : newSSTables)
             sstable.setTrackedBy(this);
@@ -378,29 +408,28 @@ public class DataTracker
             sstable.releaseReference();
     }
 
-    private void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
+    private void removeSSTablesFromTracker(Collection<SSTableReader> oldSSTables)
     {
-        if (!cfstore.isValid())
-        {
-            removeOldSSTablesSize(replacements, false);
-            replacements = Collections.emptyList();
-        }
-
         View currentView, newView;
         do
         {
             currentView = view.get();
-            newView = currentView.replace(oldSSTables, replacements);
+            newView = currentView.replace(oldSSTables, Collections.<SSTableReader>emptyList());
         }
         while (!view.compareAndSet(currentView, newView));
-
-        postReplace(oldSSTables, replacements, false);
+        removeOldSSTablesSize(oldSSTables);
     }
 
-    private void postReplace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements, boolean tolerateCompacted)
+    private void addSSTablesToTracker(Collection<SSTableReader> sstables)
     {
-        addNewSSTablesSize(replacements);
-        removeOldSSTablesSize(oldSSTables, tolerateCompacted);
+        View currentView, newView;
+        do
+        {
+            currentView = view.get();
+            newView = currentView.replace(Collections.<SSTableReader>emptyList(), sstables);
+        }
+        while (!view.compareAndSet(currentView, newView));
+        addNewSSTablesSize(sstables);
     }
 
     private void addNewSSTablesSize(Iterable<SSTableReader> newSSTables)
@@ -418,7 +447,7 @@ public class DataTracker
         }
     }
 
-    private void removeOldSSTablesSize(Iterable<SSTableReader> oldSSTables, boolean tolerateCompacted)
+    private void removeOldSSTablesSize(Iterable<SSTableReader> oldSSTables)
     {
         for (SSTableReader sstable : oldSSTables)
         {
@@ -428,13 +457,15 @@ public class DataTracker
             long size = sstable.bytesOnDisk();
             StorageMetrics.load.dec(size);
             cfstore.metric.liveDiskSpaceUsed.dec(size);
+        }
+    }
 
-            // tolerateCompacted will be true when the CFS is no longer valid (dropped). If there were ongoing
-            // compactions when it was invalidated, sstables may already be marked compacted, so we should
-            // tolerate that (see CASSANDRA-5957)
+    private void releaseReferences(Iterable<SSTableReader> oldSSTables, boolean tolerateCompacted)
+    {
+        for (SSTableReader sstable : oldSSTables)
+        {
             boolean firstToCompact = sstable.markObsolete();
-            assert (tolerateCompacted || firstToCompact) : sstable + " was already marked compacted";
-
+            assert tolerateCompacted || firstToCompact : sstable + " was already marked compacted";
             sstable.releaseReference();
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 51f45b8..84c3cb5 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -688,8 +688,9 @@ public class CompactionManager implements CompactionManagerMBean
         CleanupInfo ci = new CleanupInfo(sstable, scanner);
 
         metrics.beginCompaction(ci);
-        SSTableRewriter writer = new SSTableRewriter(cfs, new HashSet<>(ImmutableSet.of(sstable)), sstable.maxDataAge, OperationType.CLEANUP, false);
-
+        Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable);
+        SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, false);
+        List<SSTableReader> finished;
         try (CompactionController controller = new CompactionController(cfs, Collections.singleton(sstable), getDefaultGcBefore(cfs)))
         {
             writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
@@ -711,7 +712,8 @@ public class CompactionManager implements CompactionManagerMBean
             // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
             cfs.indexManager.flushIndexesBlocking();
 
-            writer.finish();
+            finished = writer.finish();
+            cfs.getDataTracker().markCompactedSSTablesReplaced(oldSSTable, finished, OperationType.CLEANUP);
         }
         catch (Throwable e)
         {
@@ -724,17 +726,16 @@ public class CompactionManager implements CompactionManagerMBean
             metrics.finishCompaction(ci);
         }
 
-        List<SSTableReader> results = writer.finished();
-        if (!results.isEmpty())
+        if (!finished.isEmpty())
         {
             String format = "Cleaned up to %s.  %,d to %,d (~%d%% of original) bytes for %,d keys.  Time: %,dms.";
             long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
             long startsize = sstable.onDiskLength();
             long endsize = 0;
-            for (SSTableReader newSstable : results)
+            for (SSTableReader newSstable : finished)
                 endsize += newSstable.onDiskLength();
             double ratio = (double) endsize / (double) startsize;
-            logger.info(String.format(format, results.get(0).getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
+            logger.info(String.format(format, finished.get(0).getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
         }
 
     }
@@ -994,8 +995,8 @@ public class CompactionManager implements CompactionManagerMBean
             sstableAsSet.add(sstable);
 
             File destination = cfs.directories.getDirectoryForNewSSTables();
-            SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
-            SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
+            SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
+            SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
 
             AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
             try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(new HashSet<>(Collections.singleton(sstable)));
@@ -1024,11 +1025,10 @@ public class CompactionManager implements CompactionManagerMBean
                 }
                 // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
                 // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
-                repairedSSTableWriter.finish(false, repairedAt);
-                unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE);
                 // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
-                anticompactedSSTables.addAll(repairedSSTableWriter.finished());
-                anticompactedSSTables.addAll(unRepairedSSTableWriter.finished());
+                anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
+                anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
+                cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION);
             }
             catch (Throwable e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index d2ae04a..b442482 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -152,7 +152,7 @@ public class CompactionTask extends AbstractCompactionTask
             {
                 AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller);
                 Iterator<AbstractCompactedRow> iter = ci.iterator();
-
+                List<SSTableReader> newSStables;
                 // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
                 // replace the old entries.  Track entries to preheat here until then.
                 long minRepairedAt = getMinRepairedAt(actuallyCompact);
@@ -161,7 +161,7 @@ public class CompactionTask extends AbstractCompactionTask
                 if (collector != null)
                     collector.beginCompaction(ci);
                 long lastCheckObsoletion = start;
-                SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline);
+                SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, offline);
                 try
                 {
                     if (!iter.hasNext())
@@ -197,7 +197,7 @@ public class CompactionTask extends AbstractCompactionTask
                     }
 
                     // don't replace old sstables yet, as we need to mark the compaction finished in the system table
-                    writer.finish(false);
+                    newSStables = writer.finish();
                 }
                 catch (Throwable t)
                 {
@@ -217,7 +217,6 @@ public class CompactionTask extends AbstractCompactionTask
                 }
 
                 Collection<SSTableReader> oldSStables = this.sstables;
-                List<SSTableReader> newSStables = writer.finished();
                 if (!offline)
                     cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index b3d098d..0cd71f2 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -22,6 +22,7 @@ import java.io.*;
 import java.util.*;
 
 import com.google.common.base.Throwables;
+import com.google.common.collect.Sets;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.sstable.*;
@@ -107,7 +108,8 @@ public class Scrubber implements Closeable
     public void scrub()
     {
         outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
-        SSTableRewriter writer = new SSTableRewriter(cfs, new HashSet<>(Collections.singleton(sstable)), sstable.maxDataAge, OperationType.SCRUB, isOffline);
+        Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable);
+        SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, isOffline);
         try
         {
             ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
@@ -256,9 +258,11 @@ public class Scrubber implements Closeable
             }
 
             // finish obsoletes the old sstable
-            writer.finish(!isOffline, badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt);
-            if (!writer.finished().isEmpty())
-                newSstable = writer.finished().get(0);
+            List<SSTableReader> finished = writer.finish(badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt);
+            if (!finished.isEmpty())
+                newSstable = finished.get(0);
+            if (!isOffline)
+                cfs.getDataTracker().markCompactedSSTablesReplaced(oldSSTable, finished, OperationType.SCRUB);
         }
         catch (Throwable t)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index f102fef..39f668d 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.*;
 
 import com.google.common.base.Throwables;
+import com.google.common.collect.Sets;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
@@ -35,7 +36,6 @@ public class Upgrader
 {
     private final ColumnFamilyStore cfs;
     private final SSTableReader sstable;
-    private final Set<SSTableReader> toUpgrade;
     private final File directory;
 
     private final OperationType compactionType = OperationType.UPGRADE_SSTABLES;
@@ -49,7 +49,6 @@ public class Upgrader
     {
         this.cfs = cfs;
         this.sstable = sstable;
-        this.toUpgrade = new HashSet<>(Collections.singleton(sstable));
         this.outputHandler = outputHandler;
 
         this.directory = new File(sstable.getFilename()).getParentFile();
@@ -57,8 +56,8 @@ public class Upgrader
         this.controller = new UpgradeController(cfs);
 
         this.strategy = cfs.getCompactionStrategy();
-        long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(toUpgrade));
-        long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(this.toUpgrade) / strategy.getMaxSSTableBytes());
+        long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(Arrays.asList(this.sstable)));
+        long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(Arrays.asList(this.sstable)) / strategy.getMaxSSTableBytes());
         this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
     }
 
@@ -68,27 +67,22 @@ public class Upgrader
 
         // Get the max timestamp of the precompacted sstables
         // and adds generation of live ancestors
-        // -- note that we always only have one SSTable in toUpgrade here:
-        for (SSTableReader sstable : toUpgrade)
+        sstableMetadataCollector.addAncestor(sstable.descriptor.generation);
+        for (Integer i : sstable.getAncestors())
         {
-            sstableMetadataCollector.addAncestor(sstable.descriptor.generation);
-            for (Integer i : sstable.getAncestors())
-            {
-                if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
-                    sstableMetadataCollector.addAncestor(i);
-            }
-            sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
+            if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
+                sstableMetadataCollector.addAncestor(i);
         }
-
+        sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
         return new SSTableWriter(cfs.getTempSSTablePath(directory), estimatedRows, repairedAt, cfs.metadata, cfs.partitioner, sstableMetadataCollector);
     }
 
     public void upgrade()
     {
         outputHandler.output("Upgrading " + sstable);
-
-        SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(this.toUpgrade), OperationType.UPGRADE_SSTABLES, true);
-        try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(this.toUpgrade))
+        Set<SSTableReader> toUpgrade = Sets.newHashSet(sstable);
+        SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(toUpgrade), true);
+        try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(toUpgrade))
         {
             Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller).iterator();
             writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
@@ -98,7 +92,8 @@ public class Upgrader
                 writer.append(row);
             }
 
-            writer.finish();
+            List<SSTableReader> sstables = writer.finish();
+            cfs.getDataTracker().markCompactedSSTablesReplaced(toUpgrade, sstables, OperationType.UPGRADE_SSTABLES);
             outputHandler.output("Upgrade of " + sstable + " complete.");
 
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index cc60b4d..65b25a4 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -416,7 +416,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
 
         for (DataTracker tracker : replacedByTracker.keySet())
         {
-            tracker.replaceReaders(replacedByTracker.get(tracker), replacementsByTracker.get(tracker), true);
+            tracker.replaceWithNewInstances(replacedByTracker.get(tracker), replacementsByTracker.get(tracker));
             newSSTables.addAll(replacementsByTracker.get(tracker));
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 2c9fe7e..4d5a06f 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.io.sstable;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -36,7 +35,6 @@ import org.apache.cassandra.db.DataTracker;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -67,8 +65,6 @@ public class SSTableRewriter
         preemptiveOpenInterval = interval;
     }
 
-    private boolean isFinished = false;
-
     @VisibleForTesting
     static void overrideOpenInterval(long size)
     {
@@ -86,16 +82,14 @@ public class SSTableRewriter
     private SSTableReader currentlyOpenedEarly; // the reader for the most recent (re)opening of the target file
     private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at
 
-    private final List<SSTableReader> finished = new ArrayList<>(); // the resultant sstables
     private final List<SSTableReader> finishedOpenedEarly = new ArrayList<>(); // the 'finished' tmplink sstables
     private final List<Pair<SSTableWriter, SSTableReader>> finishedWriters = new ArrayList<>();
-    private final OperationType rewriteType; // the type of rewrite/compaction being performed
     private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of DataTracker)
 
     private SSTableWriter writer;
     private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
 
-    public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, OperationType rewriteType, boolean isOffline)
+    public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline)
     {
         this.rewriting = rewriting;
         for (SSTableReader sstable : rewriting)
@@ -106,7 +100,6 @@ public class SSTableRewriter
         this.dataTracker = cfs.getDataTracker();
         this.cfs = cfs;
         this.maxAge = maxAge;
-        this.rewriteType = rewriteType;
         this.isOffline = isOffline;
     }
 
@@ -147,28 +140,18 @@ public class SSTableRewriter
     // attempts to append the row, if fails resets the writer position
     public RowIndexEntry tryAppend(AbstractCompactedRow row)
     {
-        mark();
+        writer.mark();
         try
         {
             return append(row);
         }
         catch (Throwable t)
         {
-            resetAndTruncate();
+            writer.resetAndTruncate();
             throw t;
         }
     }
 
-    private void mark()
-    {
-        writer.mark();
-    }
-
-    private void resetAndTruncate()
-    {
-        writer.resetAndTruncate();
-    }
-
     private void maybeReopenEarly(DecoratedKey key)
     {
         if (FBUtilities.isUnix() && writer.getFilePointer() - currentlyOpenedEarlyAt > preemptiveOpenInterval)
@@ -186,7 +169,7 @@ public class SSTableRewriter
                 SSTableReader reader = writer.openEarly(maxAge);
                 if (reader != null)
                 {
-                    replaceReader(currentlyOpenedEarly, reader, false);
+                    replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
                     currentlyOpenedEarly = reader;
                     currentlyOpenedEarlyAt = writer.getFilePointer();
                     moveStarts(reader, Functions.constant(reader.last), false);
@@ -222,7 +205,7 @@ public class SSTableRewriter
         // releases reference in replaceReaders
         if (!isOffline)
         {
-            dataTracker.replaceReaders(close, Collections.<SSTableReader>emptyList(), false);
+            dataTracker.replaceEarlyOpenedFiles(close, Collections.<SSTableReader>emptyList());
             dataTracker.unmarkCompacting(close);
         }
     }
@@ -276,12 +259,14 @@ public class SSTableRewriter
                 }));
             }
         }
-        replaceReaders(toReplace, replaceWith, true);
+        cfs.getDataTracker().replaceWithNewInstances(toReplace, replaceWith);
         rewriting.removeAll(toReplace);
         rewriting.addAll(replaceWith);
     }
 
-    private void replaceReader(SSTableReader toReplace, SSTableReader replaceWith, boolean notify)
+
+
+    private void replaceEarlyOpenedFile(SSTableReader toReplace, SSTableReader replaceWith)
     {
         if (isOffline)
             return;
@@ -296,14 +281,7 @@ public class SSTableRewriter
             dataTracker.markCompacting(Collections.singleton(replaceWith));
             toReplaceSet = Collections.emptySet();
         }
-        replaceReaders(toReplaceSet, Collections.singleton(replaceWith), notify);
-    }
-
-    private void replaceReaders(Collection<SSTableReader> toReplace, Collection<SSTableReader> replaceWith, boolean notify)
-    {
-        if (isOffline)
-            return;
-        dataTracker.replaceReaders(toReplace, replaceWith, notify);
+        dataTracker.replaceEarlyOpenedFiles(toReplaceSet, Collections.singleton(replaceWith));
     }
 
     public void switchWriter(SSTableWriter newWriter)
@@ -318,7 +296,7 @@ public class SSTableRewriter
         if (reader != null)
         {
             finishedOpenedEarly.add(reader);
-            replaceReader(currentlyOpenedEarly, reader, false);
+            replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
             moveStarts(reader, Functions.constant(reader.last), false);
         }
         finishedWriters.add(Pair.create(writer, reader));
@@ -327,38 +305,34 @@ public class SSTableRewriter
         writer = newWriter;
     }
 
-    public void finish()
-    {
-        finish(-1);
-    }
-    public void finish(long repairedAt)
-    {
-        finish(true, repairedAt);
-    }
-    public void finish(boolean cleanupOldReaders)
+    public List<SSTableReader> finish()
     {
-        finish(cleanupOldReaders, -1);
+        return finish(-1);
     }
 
     /**
      * Finishes the new file(s)
      *
-     * Creates final files, adds the new files to the dataTracker (via replaceReader) but only marks the
-     * old files as compacted if cleanupOldReaders is set to true. Otherwise it is up to the caller to do those gymnastics
-     * (ie, call DataTracker#markCompactedSSTablesReplaced(..))
+     * Creates final files, adds the new files to the dataTracker (via replaceReader).
+     *
+     * We add them to the tracker to be able to get rid of the tmpfiles
+     *
+     * It is up to the caller to do the compacted sstables replacement
+     * gymnastics (ie, call DataTracker#markCompactedSSTablesReplaced(..))
+     *
      *
-     * @param cleanupOldReaders if we should replace the old files with the new ones
      * @param repairedAt the repair time, -1 if we should use the time we supplied when we created
      *                   the SSTableWriter (and called rewriter.switchWriter(..)), actual time if we want to override the
      *                   repair time.
      */
-    public void finish(boolean cleanupOldReaders, long repairedAt)
+    public List<SSTableReader> finish(long repairedAt)
     {
+        List<SSTableReader> finished = new ArrayList<>();
         if (writer.getFilePointer() > 0)
         {
             SSTableReader reader = repairedAt < 0 ? writer.closeAndOpenReader(maxAge) : writer.closeAndOpenReader(maxAge, repairedAt);
             finished.add(reader);
-            replaceReader(currentlyOpenedEarly, reader, false);
+            replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
             moveStarts(reader, Functions.constant(reader.last), false);
         }
         else
@@ -373,7 +347,7 @@ public class SSTableRewriter
                 SSTableReader newReader = repairedAt < 0 ? w.left.closeAndOpenReader(maxAge) : w.left.closeAndOpenReader(maxAge, repairedAt);
                 finished.add(newReader);
                 // w.right is the tmplink-reader we added when switching writer, replace with the real sstable.
-                replaceReader(w.right, newReader, false);
+                replaceEarlyOpenedFile(w.right, newReader);
             }
             else
             {
@@ -384,23 +358,7 @@ public class SSTableRewriter
         if (!isOffline)
         {
             dataTracker.unmarkCompacting(finished);
-            if (cleanupOldReaders)
-                dataTracker.markCompactedSSTablesReplaced(rewriting, finished, rewriteType);
         }
-        else if (cleanupOldReaders)
-        {
-            for (SSTableReader reader : rewriting)
-            {
-                reader.markObsolete();
-                reader.releaseReference();
-            }
-        }
-        isFinished = true;
-    }
-
-    public List<SSTableReader> finished()
-    {
-        assert isFinished;
         return finished;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 6e1ac5f..5ed4f4a 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
@@ -31,6 +32,7 @@ import java.util.concurrent.ExecutionException;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ArrayBackedSortedColumns;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
@@ -41,6 +43,9 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.junit.After;
 import org.junit.Test;
@@ -89,7 +94,48 @@ public class AntiCompactionTest extends SchemaLoader
         assertEquals(repairedKeys, 4);
         assertEquals(nonRepairedKeys, 6);
     }
-    
+    @Test
+    public void antiCompactionSizeTest() throws ExecutionException, InterruptedException, IOException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.disableAutoCompaction();
+        SSTableReader s = writeFile(cfs, 1000);
+        cfs.addSSTable(s);
+        long origSize = s.bytesOnDisk();
+        System.out.println(cfs.metric.liveDiskSpaceUsed.count());
+        Range<Token> range = new Range<Token>(new BytesToken(ByteBufferUtil.bytes(0)), new BytesToken(ByteBufferUtil.bytes(500)));
+        Collection<SSTableReader> sstables = cfs.getSSTables();
+        SSTableReader.acquireReferences(sstables);
+        CompactionManager.instance.performAnticompaction(cfs, Arrays.asList(range), sstables, 12345);
+        long sum = 0;
+        for (SSTableReader x : cfs.getSSTables())
+            sum += x.bytesOnDisk();
+        assertEquals(sum, cfs.metric.liveDiskSpaceUsed.count());
+        assertEquals(origSize, cfs.metric.liveDiskSpaceUsed.count(), 100000);
+
+    }
+
+    private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
+    {
+        ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+        for (int i = 0; i < count; i++)
+            cf.addColumn(Util.column(String.valueOf(i), "a", 1));
+        File dir = cfs.directories.getDirectoryForNewSSTables();
+        String filename = cfs.getTempSSTablePath(dir);
+
+        SSTableWriter writer = new SSTableWriter(filename,
+                0,
+                0,
+                cfs.metadata,
+                StorageService.getPartitioner(),
+                new MetadataCollector(cfs.metadata.comparator));
+
+        for (int i = 0; i < count * 5; i++)
+            writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+        return writer.closeAndOpenReader();
+    }
+
     @Test
     public void shouldSkipAntiCompactionForNonIntersectingRange() throws InterruptedException, ExecutionException, IOException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index b621c45..0a2b5a6 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -428,7 +428,7 @@ public class IndexSummaryManagerTest extends SchemaLoader
         }
 
         // don't leave replaced SSTRs around to break other tests
-        cfs.getDataTracker().replaceReaders(Collections.singleton(original), Collections.singleton(sstable), true);
+        cfs.getDataTracker().replaceWithNewInstances(Collections.singleton(original), Collections.singleton(sstable));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 7f85019..6f8ab62 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -412,7 +412,7 @@ public class SSTableReaderTest extends SchemaLoader
         }
 
         SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(store, 1);
-        store.getDataTracker().replaceReaders(Arrays.asList(sstable), Arrays.asList(replacement), true);
+        store.getDataTracker().replaceWithNewInstances(Arrays.asList(sstable), Arrays.asList(replacement));
         for (Future future : futures)
             future.get();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5160c916/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 8b203ac..4d248bd 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import com.google.common.collect.Sets;
 import org.junit.Test;
@@ -40,6 +41,7 @@ import org.apache.cassandra.db.compaction.ICompactionScanner;
 import org.apache.cassandra.db.compaction.LazilyCompactedRow;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import static org.junit.Assert.assertEquals;
@@ -66,7 +68,7 @@ public class SSTableRewriterTest extends SchemaLoader
         cfs.forceBlockingFlush();
         Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
         assertEquals(1, sstables.size());
-        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, OperationType.COMPACTION, false);
+        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
         AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);
         ICompactionScanner scanner = scanners.scanners.get(0);
         CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
@@ -76,7 +78,7 @@ public class SSTableRewriterTest extends SchemaLoader
             AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next()));
             writer.append(row);
         }
-        writer.finish();
+        cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, writer.finish(), OperationType.COMPACTION);
 
         validateCFS(cfs);
 
@@ -142,7 +144,7 @@ public class SSTableRewriterTest extends SchemaLoader
 
 
     @Test
-    public void testNumberOfFiles() throws InterruptedException
+    public void testNumberOfFilesAndSizes() throws InterruptedException
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -150,10 +152,10 @@ public class SSTableRewriterTest extends SchemaLoader
 
         SSTableReader s = writeFile(cfs, 1000);
         cfs.addSSTable(s);
-
+        long startStorageMetricsLoad = StorageMetrics.load.count();
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         ICompactionScanner scanner = s.getScanner();
@@ -167,13 +169,23 @@ public class SSTableRewriterTest extends SchemaLoader
                 rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
                 files++;
                 assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+                assertEquals(s.bytesOnDisk(), cfs.metric.liveDiskSpaceUsed.count());
+                assertEquals(s.bytesOnDisk(), cfs.metric.totalDiskSpaceUsed.count());
+
             }
         }
-        rewriter.finish();
-        assertEquals(files, rewriter.finished().size());
+        List<SSTableReader> sstables = rewriter.finish();
+        cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+        long sum = 0;
+        for (SSTableReader x : cfs.getSSTables())
+            sum += x.bytesOnDisk();
+        assertEquals(sum, cfs.metric.liveDiskSpaceUsed.count());
+        assertEquals(startStorageMetricsLoad - s.bytesOnDisk() + sum, StorageMetrics.load.count());
+        assertEquals(files, sstables.size());
         assertEquals(files, cfs.getSSTables().size());
         Thread.sleep(1000);
         // tmplink and tmp files should be gone:
+        assertEquals(sum, cfs.metric.totalDiskSpaceUsed.count());
         assertFileCounts(s.descriptor.directory.list(), 0, 0);
         validateCFS(cfs);
     }
@@ -190,7 +202,7 @@ public class SSTableRewriterTest extends SchemaLoader
 
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         ICompactionScanner scanner = s.getScanner();
@@ -206,10 +218,10 @@ public class SSTableRewriterTest extends SchemaLoader
                 assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
             }
         }
-        rewriter.finish(false);
-        assertEquals(files, rewriter.finished().size());
+        List<SSTableReader> sstables = rewriter.finish();
+        assertEquals(files, sstables.size());
         assertEquals(files + 1, cfs.getSSTables().size());
-        cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, rewriter.finished(), OperationType.COMPACTION);
+        cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
         assertEquals(files, cfs.getSSTables().size());
         Thread.sleep(1000);
         assertFileCounts(s.descriptor.directory.list(), 0, 0);
@@ -226,11 +238,12 @@ public class SSTableRewriterTest extends SchemaLoader
 
         SSTableReader s = writeFile(cfs, 1000);
         cfs.addSSTable(s);
+        long startSize = cfs.metric.liveDiskSpaceUsed.count();
         DecoratedKey origFirst = s.first;
         DecoratedKey origLast = s.last;
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         ICompactionScanner scanner = s.getScanner();
@@ -248,6 +261,7 @@ public class SSTableRewriterTest extends SchemaLoader
         }
         rewriter.abort();
         Thread.sleep(1000);
+        assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.count());
         assertEquals(1, cfs.getSSTables().size());
         assertFileCounts(s.descriptor.directory.list(), 0, 0);
         assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
@@ -270,7 +284,7 @@ public class SSTableRewriterTest extends SchemaLoader
         DecoratedKey origLast = s.last;
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         ICompactionScanner scanner = s.getScanner();
@@ -313,7 +327,7 @@ public class SSTableRewriterTest extends SchemaLoader
 
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         ICompactionScanner scanner = s.getScanner();
@@ -331,7 +345,8 @@ public class SSTableRewriterTest extends SchemaLoader
             if (files == 3)
             {
                 //testing to finish when we have nothing written in the new file
-                rewriter.finish();
+                List<SSTableReader> sstables = rewriter.finish();
+                cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
                 break;
             }
         }
@@ -353,7 +368,7 @@ public class SSTableRewriterTest extends SchemaLoader
         cfs.addSSTable(s);
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         ICompactionScanner scanner = s.getScanner();
@@ -369,7 +384,8 @@ public class SSTableRewriterTest extends SchemaLoader
                 assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
             }
         }
-        rewriter.finish();
+        List<SSTableReader> sstables = rewriter.finish();
+        cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
         Thread.sleep(1000);
         assertFileCounts(s.descriptor.directory.list(), 0, 0);
         cfs.truncateBlocking();
@@ -389,7 +405,7 @@ public class SSTableRewriterTest extends SchemaLoader
         cfs.addSSTable(s);
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(1000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, OperationType.COMPACTION, false);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         ICompactionScanner scanner = s.getScanner();
@@ -406,8 +422,9 @@ public class SSTableRewriterTest extends SchemaLoader
                 files++;
             }
         }
-        rewriter.finish();
-        assertEquals(files, rewriter.finished().size());
+        List<SSTableReader> sstables = rewriter.finish();
+        cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+        assertEquals(files, sstables.size());
         assertEquals(files, cfs.getSSTables().size());
         Thread.sleep(1000);
         assertFileCounts(s.descriptor.directory.list(), 0, 0);