You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/03/04 16:04:59 UTC

[3/5] cassandra git commit: Make SSTableRewriter.abort() more robust to failure

Make SSTableRewriter.abort() more robust to failure

patch by benedict; reviewed by branimir for CASSANDRA-8832


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3c3fefa0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3c3fefa0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3c3fefa0

Branch: refs/heads/cassandra-2.1
Commit: 3c3fefa04f027a1ecbede1d41c1bf3df25218a5d
Parents: 33a9ada
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Mar 4 14:59:16 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Mar 4 14:59:16 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |   1 +
 .../org/apache/cassandra/db/DataTracker.java    |   4 +-
 .../db/compaction/CompactionManager.java        |   5 +-
 .../cassandra/io/sstable/SSTableReader.java     |   3 +-
 .../cassandra/io/sstable/SSTableRewriter.java   |  96 +++++++++++---
 .../org/apache/cassandra/utils/Throwables.java  |  32 +++++
 .../apache/cassandra/utils/concurrent/Refs.java |  25 +++-
 .../utils/concurrent/SelfRefCounted.java        |  24 ++++
 .../io/sstable/SSTableRewriterTest.java         | 124 +++++++++++--------
 10 files changed, 238 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4992d85..6133536 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.4
+ * Make SSTableRewriter.abort() more robust to failure (CASSANDRA-8832)
  * Remove cold_reads_to_omit from STCS (CASSANDRA-8860)
  * Make EstimatedHistogram#percentile() use ceil instead of floor (CASSANDRA-8883)
  * Fix top partitions reporting wrong cardinality (CASSANDRA-8834)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 9b792b6..38c5dbe 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -30,6 +30,7 @@ import javax.management.openmbean.*;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.*;
+import com.google.common.base.Throwables;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 8224311..81964f9 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.notifications.*;
 import org.apache.cassandra.utils.Interval;
 import org.apache.cassandra.utils.IntervalTree;
 import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.concurrent.Refs;
 
 public class DataTracker
 {
@@ -406,8 +407,7 @@ public class DataTracker
         for (SSTableReader sstable : newSSTables)
             sstable.setTrackedBy(this);
 
-        for (SSTableReader sstable : oldSSTables)
-            sstable.selfRef().release();
+        Refs.release(Refs.selfRefs(oldSSTables));
     }
 
     private void removeSSTablesFromTracker(Collection<SSTableReader> oldSSTables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index e54a25f..992378f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1271,7 +1271,10 @@ public class CompactionManager implements CompactionManagerMBean
                 if (t instanceof CompactionInterruptedException)
                 {
                     logger.info(t.getMessage());
-                    logger.debug("Full interruption stack trace:", t);
+                    if (t.getSuppressed() != null && t.getSuppressed().length > 0)
+                        logger.warn("Interruption of compaction encountered exceptions:", t);
+                    else
+                        logger.debug("Full interruption stack trace:", t);
                 }
                 else
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 13abc04..973b0c9 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -107,6 +107,7 @@ import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.Ref;
 import org.apache.cassandra.utils.concurrent.RefCounted;
+import org.apache.cassandra.utils.concurrent.SelfRefCounted;
 
 import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
 
@@ -166,7 +167,7 @@ import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR
  *
  * TODO: fill in details about DataTracker and lifecycle interactions for tools, and for compaction strategies
  */
-public class SSTableReader extends SSTable implements RefCounted<SSTableReader>
+public class SSTableReader extends SSTable implements SelfRefCounted<SSTableReader>
 {
     private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 2ca3e6f..be1085b 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -21,6 +21,7 @@ import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Functions;
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -32,6 +33,8 @@ import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static org.apache.cassandra.utils.Throwables.merge;
+
 /**
  * Wraps one or more writers as output for rewriting one or more readers: every sstable_preemptive_open_interval_in_mb
  * we look in the summary we're collecting for the latest writer for the penultimate key that we know to have been fully
@@ -84,6 +87,12 @@ public class SSTableRewriter
 
     private SSTableWriter writer;
     private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
+    private State state = State.WORKING;
+
+    private static enum State
+    {
+        WORKING, FINISHED, ABORTED
+    }
 
     public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline)
     {
@@ -176,30 +185,79 @@ public class SSTableRewriter
 
     public void abort()
     {
-        switchWriter(null, true);
-        moveStarts(null, null, true);
+        switch (state)
+        {
+            case ABORTED:
+                return;
+            case FINISHED:
+                throw new IllegalStateException("Cannot abort - changes have already been committed");
+        }
+        state = State.ABORTED;
+
+        Throwable fail = null;
+        try
+        {
+            moveStarts(null, null, true);
+        }
+        catch (Throwable t)
+        {
+            fail = merge(fail, t);
+        }
 
         // remove already completed SSTables
         for (SSTableReader sstable : finished)
         {
-            sstable.markObsolete();
-            sstable.selfRef().release();
+            try
+            {
+                sstable.markObsolete();
+                sstable.selfRef().release();
+            }
+            catch (Throwable t)
+            {
+                fail = merge(fail, t);
+            }
         }
 
+        if (writer != null)
+            finishedEarly.add(new Finished(writer, currentlyOpenedEarly));
+
         // abort the writers
         for (Finished finished : finishedEarly)
         {
-            boolean opened = finished.reader != null;
-            finished.writer.abort();
-            if (opened)
+            try
+            {
+                finished.writer.abort();
+            }
+            catch (Throwable t)
             {
-                // if we've already been opened, add ourselves to the discard pile
-                discard.add(finished.reader);
-                finished.reader.markObsolete();
+                fail = merge(fail, t);
+            }
+            try
+            {
+                if (finished.reader != null)
+                {
+                    // if we've already been opened, add ourselves to the discard pile
+                    discard.add(finished.reader);
+                    finished.reader.markObsolete();
+                }
+            }
+            catch (Throwable t)
+            {
+                fail = merge(fail, t);
             }
         }
 
-        replaceWithFinishedReaders(Collections.<SSTableReader>emptyList());
+        try
+        {
+            replaceWithFinishedReaders(Collections.<SSTableReader>emptyList());
+        }
+        catch (Throwable t)
+        {
+            fail = merge(fail, t);
+        }
+
+        if (fail != null)
+            throw Throwables.propagate(fail);
     }
 
     /**
@@ -301,11 +359,6 @@ public class SSTableRewriter
 
     public void switchWriter(SSTableWriter newWriter)
     {
-        switchWriter(newWriter, false);
-    }
-
-    private void switchWriter(SSTableWriter newWriter, boolean abort)
-    {
         if (writer == null)
         {
             writer = newWriter;
@@ -313,7 +366,7 @@ public class SSTableRewriter
         }
 
         // we leave it as a tmp file, but we open it and add it to the dataTracker
-        if (writer.getFilePointer() != 0 && !abort)
+        if (writer.getFilePointer() != 0)
         {
             SSTableReader reader = writer.finish(SSTableWriter.FinishType.EARLY, maxAge, -1);
             replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
@@ -362,8 +415,14 @@ public class SSTableRewriter
 
     private List<SSTableReader> finishAndMaybeThrow(long repairedAt, boolean throwEarly, boolean throwLate)
     {
+        switch (state)
+        {
+            case FINISHED: case ABORTED:
+                throw new IllegalStateException("Cannot finish - changes have already been " + state.toString().toLowerCase());
+        }
+
         List<SSTableReader> newReaders = new ArrayList<>();
-        switchWriter(null, false);
+        switchWriter(null);
 
         if (throwEarly)
             throw new RuntimeException("exception thrown early in finish, for testing");
@@ -396,6 +455,7 @@ public class SSTableRewriter
             throw new RuntimeException("exception thrown after all sstables finished, for testing");
 
         replaceWithFinishedReaders(newReaders);
+        state = State.FINISHED;
         return finished;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/src/java/org/apache/cassandra/utils/Throwables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Throwables.java b/src/java/org/apache/cassandra/utils/Throwables.java
new file mode 100644
index 0000000..552ca87
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/Throwables.java
@@ -0,0 +1,32 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.utils;
+
+public class Throwables
+{
+
+    public static Throwable merge(Throwable existingFail, Throwable newFail)
+    {
+        if (existingFail == null)
+            return newFail;
+        existingFail.addSuppressed(newFail);
+        return existingFail;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/src/java/org/apache/cassandra/utils/concurrent/Refs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Refs.java b/src/java/org/apache/cassandra/utils/concurrent/Refs.java
index 3a930d2..b24fc2f 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Refs.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Refs.java
@@ -2,9 +2,15 @@ package org.apache.cassandra.utils.concurrent;
 
 import java.util.*;
 
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
 import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 
+import static org.apache.cassandra.utils.Throwables.merge;
+
 /**
  * A collection of managed Ref references to RefCounted objects, and the objects they are referencing.
  * Care MUST be taken when using this collection, as if a permanent reference to it leaks we will not
@@ -196,7 +202,7 @@ public final class Refs<T extends RefCounted<T>> extends AbstractCollection<T> i
         throw new IllegalStateException();
     }
 
-    private static void release(Iterable<? extends Ref<?>> refs)
+    public static void release(Iterable<? extends Ref<?>> refs)
     {
         Throwable fail = null;
         for (Ref ref : refs)
@@ -207,13 +213,22 @@ public final class Refs<T extends RefCounted<T>> extends AbstractCollection<T> i
             }
             catch (Throwable t)
             {
-                if (fail == null)
-                    fail = t;
-                else
-                    fail.addSuppressed(t);
+                fail = merge(fail, t);
             }
         }
         if (fail != null)
             throw Throwables.propagate(fail);
     }
+
+    public static <T extends SelfRefCounted<T>> Iterable<Ref<T>> selfRefs(Iterable<T> refs)
+    {
+        return Iterables.transform(refs, new Function<T, Ref<T>>()
+        {
+            @Nullable
+            public Ref<T> apply(T t)
+            {
+                return t.selfRef();
+            }
+        });
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/src/java/org/apache/cassandra/utils/concurrent/SelfRefCounted.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/SelfRefCounted.java b/src/java/org/apache/cassandra/utils/concurrent/SelfRefCounted.java
new file mode 100644
index 0000000..cb45757
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/SelfRefCounted.java
@@ -0,0 +1,24 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.utils.concurrent;
+
+public interface SelfRefCounted<T extends SelfRefCounted<T>> extends RefCounted<T>
+{
+    public Ref<T> selfRef();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 4957e5a..258b6b5 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -21,6 +21,9 @@ import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
 import com.google.common.collect.Sets;
 import org.junit.Test;
 
@@ -301,48 +304,83 @@ public class SSTableRewriterTest extends SchemaLoader
     @Test
     public void testNumberOfFiles_abort() throws Exception
     {
-        Keyspace keyspace = Keyspace.open(KEYSPACE);
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        testNumberOfFiles_abort(new RewriterTest()
+        {
+            public void run(ISSTableScanner scanner, CompactionController controller, SSTableReader sstable, ColumnFamilyStore cfs, SSTableRewriter rewriter)
+            {
+                int files = 1;
+                while(scanner.hasNext())
+                {
+                    rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+                    if (rewriter.currentWriter().getFilePointer() > 25000000)
+                    {
+                        rewriter.switchWriter(getWriter(cfs, sstable.descriptor.directory));
+                        files++;
+                        assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+                    }
+                }
+                rewriter.abort();
+            }
+        });
+    }
 
-        SSTableReader s = writeFile(cfs, 1000);
-        cfs.addSSTable(s);
-        long startSize = cfs.metric.liveDiskSpaceUsed.count();
-        DecoratedKey origFirst = s.first;
-        DecoratedKey origLast = s.last;
-        Set<SSTableReader> compacting = Sets.newHashSet(s);
-        SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
-        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+    @Test
+    public void testNumberOfFiles_abort2() throws Exception
+    {
+        testNumberOfFiles_abort(new RewriterTest()
+        {
+            public void run(ISSTableScanner scanner, CompactionController controller, SSTableReader sstable, ColumnFamilyStore cfs, SSTableRewriter rewriter)
+            {
+                int files = 1;
+                while(scanner.hasNext())
+                {
+                    rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+                    if (rewriter.currentWriter().getFilePointer() > 25000000)
+                    {
+                        rewriter.switchWriter(getWriter(cfs, sstable.descriptor.directory));
+                        files++;
+                        assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+                    }
+                    if (files == 3)
+                    {
+                        //testing to abort when we have nothing written in the new file
+                        rewriter.abort();
+                        break;
+                    }
+                }
+            }
+        });
+    }
 
-        int files = 1;
-        try (ISSTableScanner scanner = s.getScanner();
-             CompactionController controller = new CompactionController(cfs, compacting, 0))
+    @Test
+    public void testNumberOfFiles_abort3() throws Exception
+    {
+        testNumberOfFiles_abort(new RewriterTest()
         {
-            while(scanner.hasNext())
+            public void run(ISSTableScanner scanner, CompactionController controller, SSTableReader sstable, ColumnFamilyStore cfs, SSTableRewriter rewriter)
             {
-                rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
-                if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+                int files = 1;
+                while(scanner.hasNext())
                 {
-                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
-                    files++;
-                    assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+                    rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+                    if (files == 1 && rewriter.currentWriter().getFilePointer() > 10000000)
+                    {
+                        rewriter.switchWriter(getWriter(cfs, sstable.descriptor.directory));
+                        files++;
+                        assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+                    }
                 }
+                rewriter.abort();
             }
-        }
-        rewriter.abort();
-        Thread.sleep(1000);
-        assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.count());
-        assertEquals(1, cfs.getSSTables().size());
-        assertFileCounts(s.descriptor.directory.list(), 0, 0);
-        assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
-        assertEquals(cfs.getSSTables().iterator().next().last, origLast);
-        validateCFS(cfs);
+        });
+    }
 
+    private static interface RewriterTest
+    {
+        public void run(ISSTableScanner scanner, CompactionController controller, SSTableReader sstable, ColumnFamilyStore cfs, SSTableRewriter rewriter);
     }
 
-    @Test
-    public void testNumberOfFiles_abort2() throws Exception
+    private void testNumberOfFiles_abort(RewriterTest test) throws Exception
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -353,36 +391,22 @@ public class SSTableRewriterTest extends SchemaLoader
 
         DecoratedKey origFirst = s.first;
         DecoratedKey origLast = s.last;
+        long startSize = cfs.metric.liveDiskSpaceUsed.count();
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(10000000);
         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
-        int files = 1;
         try (ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
-            while(scanner.hasNext())
-            {
-                rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
-                if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
-                {
-                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
-                    files++;
-                    assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
-                }
-                if (files == 3)
-                {
-                    //testing to abort when we have nothing written in the new file
-                    rewriter.abort();
-                    break;
-                }
-            }
+            test.run(scanner, controller, s, cfs, rewriter);
         }
+
         Thread.sleep(1000);
+        assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.count());
         assertEquals(1, cfs.getSSTables().size());
         assertFileCounts(s.descriptor.directory.list(), 0, 0);
-
         assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
         assertEquals(cfs.getSSTables().iterator().next().last, origLast);
         validateCFS(cfs);