You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/05/17 14:51:06 UTC

[4/7] cassandra git commit: Introduce Transactional API for internal state changes

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 3104a96..d39da61 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 import static org.junit.Assert.*;
+import static org.apache.cassandra.utils.Throwables.maybeFail;
 
 public class SSTableRewriterTest extends SchemaLoader
 {
@@ -93,8 +94,8 @@ public class SSTableRewriterTest extends SchemaLoader
         cfs.forceBlockingFlush();
         Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
         assertEquals(1, sstables.size());
-        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
-        try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
+        try (SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
+             AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
         {
             ISSTableScanner scanner = scanners.scanners.get(0);
             CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
@@ -104,9 +105,9 @@ public class SSTableRewriterTest extends SchemaLoader
                 AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next()));
                 writer.append(row);
             }
+            Collection<SSTableReader> newsstables = writer.finish();
+            cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables , OperationType.COMPACTION);
         }
-        Collection<SSTableReader> newsstables = writer.finish();
-        cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION);
         SSTableDeletingTask.waitForDeletions();
 
         validateCFS(cfs);
@@ -126,8 +127,8 @@ public class SSTableRewriterTest extends SchemaLoader
         Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
         assertEquals(1, sstables.size());
         SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
-        try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
+        try (SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
+             AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
         {
             ISSTableScanner scanner = scanners.scanners.get(0);
             CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
@@ -137,9 +138,9 @@ public class SSTableRewriterTest extends SchemaLoader
                 AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next()));
                 writer.append(row);
             }
+            Collection<SSTableReader> newsstables = writer.finish();
+            cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION);
         }
-        Collection<SSTableReader> newsstables = writer.finish();
-        cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION);
         SSTableDeletingTask.waitForDeletions();
 
         validateCFS(cfs);
@@ -159,9 +160,9 @@ public class SSTableRewriterTest extends SchemaLoader
         Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
         assertEquals(1, sstables.size());
         SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
         boolean checked = false;
-        try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
+        try (SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
+             AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
         {
             ISSTableScanner scanner = scanners.scanners.get(0);
             CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
@@ -191,10 +192,10 @@ public class SSTableRewriterTest extends SchemaLoader
                     }
                 }
             }
+            assertTrue(checked);
+            Collection<SSTableReader> newsstables = writer.finish();
+            cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION);
         }
-        assertTrue(checked);
-        Collection<SSTableReader> newsstables = writer.finish();
-        cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION);
         SSTableDeletingTask.waitForDeletions();
 
         validateCFS(cfs);
@@ -216,17 +217,17 @@ public class SSTableRewriterTest extends SchemaLoader
         for (int i = 0; i < 100; i++)
             cf.addColumn(Util.cellname(i), ByteBuffer.allocate(1000), 1);
         File dir = cfs.directories.getDirectoryForNewSSTables();
-        SSTableWriter writer = getWriter(cfs, dir);
-        try
+
+        try (SSTableWriter writer = getWriter(cfs, dir);)
         {
             for (int i = 0; i < 10000; i++)
                 writer.append(StorageService.getPartitioner().decorateKey(random(i, 10)), cf);
-            SSTableReader s = writer.openEarly(1000);
+            SSTableReader s = writer.setMaxDataAge(1000).openEarly();
             assert s != null;
             assertFileCounts(dir.list(), 2, 2);
             for (int i = 10000; i < 20000; i++)
                 writer.append(StorageService.getPartitioner().decorateKey(random(i, 10)), cf);
-            SSTableReader s2 = writer.openEarly(1000);
+            SSTableReader s2 = writer.setMaxDataAge(1000).openEarly();
             assertTrue(s.last.compareTo(s2.last) < 0);
             assertFileCounts(dir.list(), 2, 2);
             s.markObsolete();
@@ -245,11 +246,6 @@ public class SSTableRewriterTest extends SchemaLoader
             assertEquals(datafiles, 0);
             validateCFS(cfs);
         }
-        catch (Throwable t)
-        {
-            writer.abort();
-            throw t;
-        }
     }
 
     @Test
@@ -264,13 +260,14 @@ public class SSTableRewriterTest extends SchemaLoader
         long startStorageMetricsLoad = StorageMetrics.load.getCount();
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
-        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
+        List<SSTableReader> sstables;
         int files = 1;
-        try (ISSTableScanner scanner = s.getScanner();
+        try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+             ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
+            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
             while(scanner.hasNext())
             {
                 rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
@@ -284,9 +281,10 @@ public class SSTableRewriterTest extends SchemaLoader
 
                 }
             }
+            sstables = rewriter.finish();
+            cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
+            assertEquals(files, sstables.size());
         }
-        List<SSTableReader> sstables = rewriter.finish();
-        cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
         long sum = 0;
         for (SSTableReader x : cfs.getSSTables())
             sum += x.bytesOnDisk();
@@ -314,13 +312,14 @@ public class SSTableRewriterTest extends SchemaLoader
 
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
-        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
+        List<SSTableReader> sstables;
         int files = 1;
-        try (ISSTableScanner scanner = s.getScanner();
+        try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+             ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
+            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
             while(scanner.hasNext())
             {
                 rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
@@ -331,14 +330,9 @@ public class SSTableRewriterTest extends SchemaLoader
                     assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
                 }
             }
-        }
-        catch (Throwable t)
-        {
-            rewriter.abort();
-            throw t;
+            sstables = rewriter.finish();
         }
 
-        List<SSTableReader> sstables = rewriter.finish();
         assertEquals(files, sstables.size());
         assertEquals(files, cfs.getSSTables().size());
         assertEquals(1, cfs.getDataTracker().getView().shadowed.size());
@@ -445,19 +439,14 @@ public class SSTableRewriterTest extends SchemaLoader
         long startSize = cfs.metric.liveDiskSpaceUsed.getCount();
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
-        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
-        try (ISSTableScanner scanner = s.getScanner();
+        try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+             ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
+            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
             test.run(scanner, controller, s, cfs, rewriter);
         }
-        catch (Throwable t)
-        {
-            rewriter.abort();
-            throw t;
-        }
 
         SSTableDeletingTask.waitForDeletions();
 
@@ -481,13 +470,13 @@ public class SSTableRewriterTest extends SchemaLoader
 
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
-        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         int files = 1;
-        try (ISSTableScanner scanner = s.getScanner();
+        try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+             ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
+            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
             while(scanner.hasNext())
             {
                 rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
@@ -506,11 +495,6 @@ public class SSTableRewriterTest extends SchemaLoader
                 }
             }
         }
-        catch (Throwable t)
-        {
-            rewriter.abort();
-            throw t;
-        }
 
         SSTableDeletingTask.waitForDeletions();
 
@@ -531,13 +515,14 @@ public class SSTableRewriterTest extends SchemaLoader
         cfs.addSSTable(s);
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
-        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
+        List<SSTableReader> sstables;
         int files = 1;
-        try (ISSTableScanner scanner = s.getScanner();
+        try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+             ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
+            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
             while(scanner.hasNext())
             {
                 rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
@@ -548,15 +533,11 @@ public class SSTableRewriterTest extends SchemaLoader
                     assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
                 }
             }
-        }
-        catch (Throwable t)
-        {
-            rewriter.abort();
-            throw t;
+
+            sstables = rewriter.finish();
+            cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
         }
 
-        List<SSTableReader> sstables = rewriter.finish();
-        cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
         SSTableDeletingTask.waitForDeletions();
         assertFileCounts(s.descriptor.directory.list(), 0, 0);
         cfs.truncateBlocking();
@@ -576,13 +557,14 @@ public class SSTableRewriterTest extends SchemaLoader
         cfs.addSSTable(s);
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(1000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
-        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
+        List<SSTableReader> sstables;
         int files = 1;
-        try (ISSTableScanner scanner = s.getScanner();
+        try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+             ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
+            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
             while(scanner.hasNext())
             {
                 rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
@@ -593,15 +575,11 @@ public class SSTableRewriterTest extends SchemaLoader
                     files++;
                 }
             }
-        }
-        catch (Throwable t)
-        {
-            rewriter.abort();
-            throw t;
+
+            sstables = rewriter.finish();
+            cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
         }
 
-        List<SSTableReader> sstables = rewriter.finish();
-        cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
         assertEquals(files, sstables.size());
         assertEquals(files, cfs.getSSTables().size());
         SSTableDeletingTask.waitForDeletions();
@@ -668,12 +646,12 @@ public class SSTableRewriterTest extends SchemaLoader
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         cfs.getDataTracker().markCompacting(compacting);
         SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, offline);
-        SSTableWriter w = getWriter(cfs, s.descriptor.directory);
-        rewriter.switchWriter(w);
-        try (ISSTableScanner scanner = compacting.iterator().next().getScanner();
+
+        try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, offline);
+             ISSTableScanner scanner = compacting.iterator().next().getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
+            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
             while (scanner.hasNext())
             {
                 rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
@@ -684,7 +662,8 @@ public class SSTableRewriterTest extends SchemaLoader
             }
             try
             {
-                rewriter.finishAndThrow(earlyException);
+                rewriter.throwDuringPrepare(earlyException);
+                rewriter.prepareToCommit();
             }
             catch (Throwable t)
             {
@@ -749,14 +728,14 @@ public class SSTableRewriterTest extends SchemaLoader
         compacting.add(s);
         cfs.getDataTracker().markCompacting(compacting);
 
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+
         SSTableRewriter.overrideOpenInterval(1);
-        SSTableWriter w = getWriter(cfs, s.descriptor.directory);
-        rewriter.switchWriter(w);
         int keyCount = 0;
-        try (ISSTableScanner scanner = compacting.iterator().next().getScanner();
+        try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+             ISSTableScanner scanner = compacting.iterator().next().getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
+            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
             while (scanner.hasNext())
             {
                 rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
@@ -841,11 +820,12 @@ public class SSTableRewriterTest extends SchemaLoader
         File dir = cfs.directories.getDirectoryForNewSSTables();
         String filename = cfs.getTempSSTablePath(dir);
 
-        SSTableWriter writer = SSTableWriter.create(filename, 0, 0);
-
-        for (int i = 0; i < count * 5; i++)
-            writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
-        return writer.closeAndOpenReader();
+        try (SSTableWriter writer = SSTableWriter.create(filename, 0, 0);)
+        {
+            for (int i = 0; i < count * 5; i++)
+                writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+            return writer.finish(true);
+        }
     }
 
     private void validateCFS(ColumnFamilyStore cfs)
@@ -899,7 +879,7 @@ public class SSTableRewriterTest extends SchemaLoader
         return SSTableWriter.create(filename, 0, 0);
     }
 
-    private ByteBuffer random(int i, int size)
+    public static ByteBuffer random(int i, int size)
     {
         byte[] bytes = new byte[size + 4];
         ThreadLocalRandom.current().nextBytes(bytes);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index d2559cd..a116b84 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -217,7 +217,7 @@ public class SSTableUtils
             File datafile = (dest == null) ? tempSSTableFile(ksname, cfname, generation) : new File(dest.filenameFor(Component.DATA));
             SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(datafile.getAbsolutePath()), expectedSize, ActiveRepairService.UNREPAIRED_SSTABLE, 0);
             while (appender.append(writer)) { /* pass */ }
-            SSTableReader reader = writer.closeAndOpenReader();
+            SSTableReader reader = writer.finish(true);
             // mark all components for removal
             if (cleanup)
                 for (Component component : reader.components)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
index a40034d..0ff4b01 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
@@ -128,7 +128,7 @@ public class BufferedRandomAccessFileTest
             assert data[i] == 0;
         }
 
-        w.close();
+        w.finish();
         r.close();
     }
 
@@ -153,7 +153,7 @@ public class BufferedRandomAccessFileTest
         assert negone == -1 : "We read past the end of the file, should have gotten EOF -1. Instead, " + negone;
 
         r.close();
-        w.close();
+        w.finish();
     }
 
     @Test
@@ -178,7 +178,7 @@ public class BufferedRandomAccessFileTest
         w.write(biggerThenBuffer);
         assertEquals(biggerThenBuffer.length + lessThenBuffer.length, w.length());
 
-        w.close();
+        w.finish();
 
         // will use cachedlength
         RandomAccessReader r = RandomAccessReader.open(tmpFile);
@@ -223,7 +223,7 @@ public class BufferedRandomAccessFileTest
             }
         });
 
-        w.close();
+        w.finish();
         r.close();
     }
 
@@ -233,7 +233,7 @@ public class BufferedRandomAccessFileTest
         SequentialWriter w = createTempFile("brafSeek");
         byte[] data = generateByteArray(RandomAccessReader.DEFAULT_BUFFER_SIZE + 20);
         w.write(data);
-        w.close();
+        w.finish();
 
         final RandomAccessReader file = RandomAccessReader.open(w);
 
@@ -272,7 +272,7 @@ public class BufferedRandomAccessFileTest
     {
         SequentialWriter w = createTempFile("brafSkipBytes");
         w.write(generateByteArray(RandomAccessReader.DEFAULT_BUFFER_SIZE * 2));
-        w.close();
+        w.finish();
 
         RandomAccessReader file = RandomAccessReader.open(w);
 
@@ -320,7 +320,7 @@ public class BufferedRandomAccessFileTest
         r.read(new byte[4]);
         assertEquals(r.getFilePointer(), 20);
 
-        w.close();
+        w.finish();
         r.close();
     }
 
@@ -329,7 +329,7 @@ public class BufferedRandomAccessFileTest
     {
         SequentialWriter file = createTempFile("brafGetPath");
         assert file.getPath().contains("brafGetPath");
-        file.close();
+        file.finish();
     }
 
     @Test
@@ -411,7 +411,7 @@ public class BufferedRandomAccessFileTest
         r.skipBytes(10);
         assertEquals(r.bytesRemaining(), r.length() - 10);
 
-        w.close();
+        w.finish();
         r.close();
     }
 
@@ -443,7 +443,7 @@ public class BufferedRandomAccessFileTest
         byte[] data = generateByteArray(RandomAccessReader.DEFAULT_BUFFER_SIZE + 20);
 
         w.write(data);
-        w.close(); // will flush
+        w.finish();
 
         final RandomAccessReader r = RandomAccessReader.open(new File(w.getPath()));
 
@@ -481,7 +481,7 @@ public class BufferedRandomAccessFileTest
         SequentialWriter w = createTempFile("brafTestMark");
         w.write(new byte[30]);
 
-        w.close();
+        w.finish();
 
         RandomAccessReader file = RandomAccessReader.open(w);
 
@@ -542,10 +542,10 @@ public class BufferedRandomAccessFileTest
         SequentialWriter w2 = createTempFile("fscache2");
 
         w1.write(new byte[30]);
-        w1.close();
+        w1.finish();
 
         w2.write(new byte[30]);
-        w2.close();
+        w2.finish();
 
         for (int i = 0; i < 20; i++)
         {
@@ -652,7 +652,7 @@ public class BufferedRandomAccessFileTest
 
         assertEquals(new String(content), "cccccccccc");
 
-        file.close();
+        file.finish();
         copy.close();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java
new file mode 100644
index 0000000..9731a8d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java
@@ -0,0 +1,92 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.io.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.After;
+
+import junit.framework.Assert;
+
+public class ChecksummedSequentialWriterTest extends SequentialWriterTest
+{
+
+    private final List<TestableCSW> writers = new ArrayList<>();
+
+    @After
+    public void cleanup()
+    {
+        for (TestableSW sw : writers)
+            sw.file.delete();
+        writers.clear();
+    }
+
+    protected TestableTransaction newTest() throws IOException
+    {
+        TestableCSW sw = new TestableCSW();
+        writers.add(sw);
+        return sw;
+    }
+
+    private static class TestableCSW extends TestableSW
+    {
+        final File crcFile;
+
+        private TestableCSW() throws IOException
+        {
+            this(tempFile("compressedsequentialwriter"),
+                 tempFile("compressedsequentialwriter.checksum"));
+        }
+
+        private TestableCSW(File file, File crcFile) throws IOException
+        {
+            this(file, crcFile, new ChecksummedSequentialWriter(file, BUFFER_SIZE, crcFile));
+        }
+
+        private TestableCSW(File file, File crcFile, SequentialWriter sw) throws IOException
+        {
+            super(file, sw);
+            this.crcFile = crcFile;
+        }
+
+        protected void assertInProgress() throws Exception
+        {
+            super.assertInProgress();
+            Assert.assertTrue(crcFile.exists());
+            Assert.assertEquals(0, crcFile.length());
+        }
+
+        protected void assertPrepared() throws Exception
+        {
+            super.assertPrepared();
+            Assert.assertTrue(crcFile.exists());
+            Assert.assertFalse(0 == crcFile.length());
+        }
+
+        protected void assertAborted() throws Exception
+        {
+            super.assertAborted();
+            Assert.assertFalse(crcFile.exists());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index b15da47..ec280fa 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -180,6 +180,7 @@ public class DataOutputTest
         DataOutputStreamPlus write = new WrappedDataOutputStreamPlus(writer);
         DataInput canon = testWrite(write);
         write.flush();
+        writer.finish();
         write.close();
         DataInputStream test = new DataInputStream(new FileInputStream(file));
         testRead(test, canon);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
new file mode 100644
index 0000000..ef52030
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
@@ -0,0 +1,117 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.io.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.junit.After;
+
+import junit.framework.Assert;
+import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
+
+import static org.apache.commons.io.FileUtils.*;
+
+public class SequentialWriterTest extends AbstractTransactionalTest
+{
+
+    private final List<TestableSW> writers = new ArrayList<>();
+
+    @After
+    public void cleanup()
+    {
+        for (TestableSW sw : writers)
+            sw.file.delete();
+        writers.clear();
+    }
+
+    protected TestableTransaction newTest() throws IOException
+    {
+        TestableSW sw = new TestableSW();
+        writers.add(sw);
+        return sw;
+    }
+
+    protected static class TestableSW extends TestableTransaction
+    {
+        protected static final int BUFFER_SIZE = 8 << 10;
+        protected final File file;
+        protected final SequentialWriter writer;
+        protected final byte[] fullContents, partialContents;
+
+        protected TestableSW() throws IOException
+        {
+            this(tempFile("sequentialwriter"));
+        }
+
+        protected TestableSW(File file) throws IOException
+        {
+            this(file, new SequentialWriter(file, 8 << 10, true));
+        }
+
+        protected TestableSW(File file, SequentialWriter sw) throws IOException
+        {
+            super(sw);
+            this.file = file;
+            this.writer = sw;
+            fullContents = new byte[BUFFER_SIZE + BUFFER_SIZE / 2];
+            ThreadLocalRandom.current().nextBytes(fullContents);
+            partialContents = Arrays.copyOf(fullContents, BUFFER_SIZE);
+            sw.write(fullContents);
+        }
+
+        protected void assertInProgress() throws Exception
+        {
+            Assert.assertTrue(file.exists());
+            byte[] bytes = readFileToByteArray(file);
+            Assert.assertTrue(Arrays.equals(partialContents, bytes));
+        }
+
+        protected void assertPrepared() throws Exception
+        {
+            Assert.assertTrue(file.exists());
+            byte[] bytes = readFileToByteArray(file);
+            Assert.assertTrue(Arrays.equals(fullContents, bytes));
+        }
+
+        protected void assertAborted() throws Exception
+        {
+            Assert.assertFalse(writer.isOpen());
+            Assert.assertFalse(file.exists());
+        }
+
+        protected void assertCommitted() throws Exception
+        {
+            assertPrepared();
+            Assert.assertFalse(writer.isOpen());
+        }
+
+        protected static File tempFile(String prefix)
+        {
+            File file = FileUtils.createTempFile(prefix, "test");
+            file.delete();
+            return file;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 016deb3..20371c3 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -76,7 +76,7 @@ public class CompressedInputStreamTest
             index.put(l, writer.getFilePointer());
             writer.stream.writeLong(l);
         }
-        writer.close();
+        writer.finish();
 
         CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
         List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/SSTableExportTest.java b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
index 7e59207..bc73c83 100644
--- a/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
+++ b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
@@ -109,7 +109,7 @@ public class SSTableExportTest
         writer.append(Util.dk("rowB"), cfamily);
         cfamily.clear();
 
-        writer.closeAndOpenReader();
+        writer.finish(true);
 
         // Enumerate and verify
         File temp = File.createTempFile("Standard1", ".txt");
@@ -153,7 +153,7 @@ public class SSTableExportTest
         writer.append(Util.dk("rowExclude"), cfamily);
         cfamily.clear();
 
-        SSTableReader reader = writer.closeAndOpenReader();
+        SSTableReader reader = writer.finish(true);
 
         // Export to JSON and verify
         File tempJson = File.createTempFile("Standard1", ".json");
@@ -202,7 +202,7 @@ public class SSTableExportTest
         writer.append(Util.dk("rowExclude"), cfamily);
         cfamily.clear();
 
-        SSTableReader reader = writer.closeAndOpenReader();
+        SSTableReader reader = writer.finish(true);
 
         // Export to JSON and verify
         File tempJson = File.createTempFile("Standard1", ".json");
@@ -237,7 +237,7 @@ public class SSTableExportTest
         writer.append(Util.dk("rowA"), cfamily);
         cfamily.clear();
 
-        SSTableReader reader = writer.closeAndOpenReader();
+        SSTableReader reader = writer.finish(true);
 
         // Export to JSON and verify
         File tempJson = File.createTempFile("Counter1", ".json");
@@ -268,7 +268,7 @@ public class SSTableExportTest
         writer.append(Util.dk("rowA"), cfamily);
         cfamily.clear();
 
-        SSTableReader reader = writer.closeAndOpenReader();
+        SSTableReader reader = writer.finish(true);
 
         // Export to JSON and verify
         File tempJson = File.createTempFile("ValuesWithQuotes", ".json");
@@ -300,7 +300,7 @@ public class SSTableExportTest
         cfamily.delete(new DeletionInfo(0, 0));
         writer.append(Util.dk("rowA"), cfamily);
 
-        SSTableReader reader = writer.closeAndOpenReader();
+        SSTableReader reader = writer.finish(true);
         // Export to JSON and verify
         File tempJson = File.createTempFile("CFWithDeletionInfo", ".json");
         SSTableExport.export(reader, new PrintStream(tempJson.getPath()), new String[0]);
@@ -359,7 +359,7 @@ public class SSTableExportTest
         cfamily.addColumn(column(CFMetaData.DEFAULT_KEY_ALIAS, "not a uuid", 1L));
         writer.append(Util.dk(ByteBufferUtil.bytes(UUIDGen.getTimeUUID())), cfamily);
 
-        SSTableReader reader = writer.closeAndOpenReader();
+        SSTableReader reader = writer.finish(true);
         // Export to JSON and verify
         File tempJson = File.createTempFile("CFWithColumnNameEqualToDefaultKeyAlias", ".json");
         SSTableExport.export(reader, new PrintStream(tempJson.getPath()), new String[0]);
@@ -388,7 +388,7 @@ public class SSTableExportTest
         cfamily.addColumn(column("column", "value", 1L));
         writer.append(Util.dk("key", AsciiType.instance), cfamily);
 
-        SSTableReader reader = writer.closeAndOpenReader();
+        SSTableReader reader = writer.finish(true);
         // Export to JSON and verify
         File tempJson = File.createTempFile("CFWithAsciiKeys", ".json");
         SSTableExport.export(reader,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/utils/concurrent/AbstractTransactionalTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/concurrent/AbstractTransactionalTest.java b/test/unit/org/apache/cassandra/utils/concurrent/AbstractTransactionalTest.java
new file mode 100644
index 0000000..4e160c2
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/concurrent/AbstractTransactionalTest.java
@@ -0,0 +1,136 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.utils.concurrent;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+@Ignore
+public abstract class AbstractTransactionalTest
+{
+
+    protected abstract TestableTransaction newTest() throws Exception;
+
+    @Test
+    public void testNoPrepare() throws Exception
+    {
+        TestableTransaction txn;
+
+        txn = newTest();
+        txn.assertInProgress();
+        txn.testing.close();
+        txn.assertAborted();
+
+        txn = newTest();
+        txn.assertInProgress();
+        Assert.assertNull(txn.testing.abort(null));
+        txn.assertAborted();
+    }
+
+    @Test
+    public void testPrepare() throws Exception
+    {
+        TestableTransaction txn;
+        txn = newTest();
+        txn.assertInProgress();
+        txn.testing.prepareToCommit();
+        txn.assertPrepared();
+        txn.testing.close();
+        txn.assertAborted();
+
+        txn = newTest();
+        txn.assertInProgress();
+        txn.testing.prepareToCommit();
+        txn.assertPrepared();
+        Assert.assertNull(txn.testing.abort(null));
+        txn.assertAborted();
+    }
+
+    @Test
+    public void testCommit() throws Exception
+    {
+        TestableTransaction txn = newTest();
+        txn.assertInProgress();
+        txn.testing.prepareToCommit();
+        txn.assertPrepared();
+        Assert.assertNull(txn.testing.commit(null));
+        txn.assertCommitted();
+        txn.testing.close();
+        txn.assertCommitted();
+        Throwable t = txn.testing.abort(null);
+        Assert.assertTrue(t instanceof IllegalStateException);
+        txn.assertCommitted();
+    }
+
+    @Test
+    public void testThrowableReturn() throws Exception
+    {
+        TestableTransaction txn;
+        txn = newTest();
+        Throwable t = new RuntimeException();
+        txn.testing.prepareToCommit();
+        Assert.assertEquals(t, txn.testing.commit(t));
+        Assert.assertEquals(t, txn.testing.abort(t));
+        Assert.assertTrue(t.getSuppressed()[0] instanceof IllegalStateException);
+    }
+
+    @Test
+    public void testBadCommit() throws Exception
+    {
+        TestableTransaction txn;
+        txn = newTest();
+        try
+        {
+            txn.testing.commit(null);
+            Assert.assertTrue(false);
+        }
+        catch (IllegalStateException t)
+        {
+        }
+        txn.assertInProgress();
+        Assert.assertNull(txn.testing.abort(null));
+        txn.assertAborted();
+        try
+        {
+            txn.testing.commit(null);
+            Assert.assertTrue(false);
+        }
+        catch (IllegalStateException t)
+        {
+        }
+        txn.assertAborted();
+    }
+
+
+    public static abstract class TestableTransaction
+    {
+        final Transactional testing;
+        public TestableTransaction(Transactional transactional)
+        {
+            this.testing = transactional;
+        }
+
+        protected abstract void assertInProgress() throws Exception;
+        protected abstract void assertPrepared() throws Exception;
+        protected abstract void assertAborted() throws Exception;
+        protected abstract void assertCommitted() throws Exception;
+    }
+}